响应式编程初探-自定义实现Reactive Streams规范

news2025/1/19 23:04:25

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,定义了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {
    void request(long n);
    void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {
        void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {
        void onSubscribe(Subscription subscription);
        void onNext(T item);
        void onError(Throwable throwable);
        void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {
        void request(long n);
        void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{
    MySubscription<String> subscription;
    public int request ;
    public void publish(String item){
        subscription.items.add(item);
        while (true) {
            if (request > 0) {
                for (int i = 0; i < request; i++) {
                    if (!subscription.items.isEmpty()) {
                        try {
                            Object o = subscription.items.get(subscription.items.size() - 1);
                            subscription.subscriber.onNext(o.toString());
                            subscription.items.remove(o);
                        }catch (Exception e){
                            subscription.subscriber.onError(e);
                            return;
                        }
                    }

                }
            }
            if (subscription.items.isEmpty()) {
                break;
            }
        }
    }

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        System.out.println("第一步:绑定订阅者" );
        MySubscription<String> subscription = new MySubscription<>(subscriber,this);
        this.subscription = subscription;
        subscriber.onSubscribe(subscription);
    }

}

class MySubscriber implements Flow.Subscriber<String>{

    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("第二步:接收Subscription" );
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("第四步:推送数据" );
        System.out.println("MySubscriber 消费了item = " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("出异常了 = " + throwable);
    }

    @Override
    public void onComplete() {

    }

}

class MySubscription<T> implements Flow.Subscription{

    final Flow.Subscriber<? super T> subscriber;
    final MyPublisher publisher;
    List items = new ArrayList();
    public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {
        this.subscriber = subscriber;
        this.publisher = publisher;
    }

    @Override
    public void request(long n) {
        this.publisher.request++;
        System.out.println("第三步:拉取请求" );
    }

    @Override
    public void cancel() {

    }
}
public class FlowDemo {

    public static void main(String[] args) {
        MyPublisher myPublisher = new MyPublisher();
        MySubscriber mySubscriber = new MySubscriber();
        myPublisher.subscribe(mySubscriber);
        myPublisher.publish("111");
        myPublisher.publish("222");
        myPublisher.publish(null);


    }
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {

    private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

    public void publishItems() {
        for (int i = 1; i <= 5; i++) {
            publisher.submit(i);
        }

        // 发布者完成发布
        publisher.close();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        publisher.subscribe(subscriber);
    }
}

class SimpleSubscriber implements Flow.Subscriber<Integer> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received item: " + item);
        // 处理完一个元素后请求下一个
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Processing completed.");
    }
}

public class ReactiveStreamsExample {

    public static void main(String[] args) throws InterruptedException {
        // 创建发布者和订阅者
        SimplePublisher simplePublisher = new SimplePublisher();
        SimpleSubscriber simpleSubscriber = new SimpleSubscriber();

        // 订阅者订阅发布者
        simplePublisher.subscribe(simpleSubscriber);

        // 发布者发布数据
        simplePublisher.publishItems();
        // 睡一觉,确保数据处理完成
        Thread.sleep(3000);
    }
}

学习打卡:Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1384630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

原生js实现拖拽效果

<!DOCTYPE html> <html> <head> <style> #mydiv { width: 200px; height: 200px; background-color: red; position: absolute; cursor: move; } </style> | </head> <body> <div id"mydiv">拖拽我…

苏州倍丰智能新型雾化粉末技术量产成功!金属3D打印全产业链更进一步

苏州倍丰智能深耕金属3D打印技术领域&#xff0c;以金属3D打印全产业链为目标&#xff0c;围绕金属3D打印设备&#xff0c;涵盖包括金属粉末前后处理设备、金属粉末原材料制备、先进工艺研发等多个领域&#xff0c;完成了一整条自上而下的金属3D打印全产业链。 近日&#xff0c…

java大数据hadoop2.9.2 Java编写Hadoop分析平均成绩

1、准备文件&#xff0c;例如score.txt&#xff0c;内容如下&#xff1a; zs k1 88 ls k1 98 ww k1 78 zs k2 88 ls k2 98 ww k2 78 zs k3 88 ls k3 98 ww k3 78 2、创建maven项目 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><d…

哈希表与哈希算法(Python系列30)

在讲哈希表数据结构和哈希算法之前&#xff0c;我想先刨析一下数组和python中的列表 首先来讲一下数组&#xff0c;我想在这提出一个疑问&#xff1a; 为什么数组通过索引查询数据的时间复杂度为O(1)&#xff0c;也就是不管数组有多大&#xff0c;算法的执行时间都是不变的。…

《SPSS统计学基础与实证研究应用精解》视频讲解:SPSS 软件的安装要求、启动与关闭

《SPSS统计学基础与实证研究应用精解》1.6 视频讲解 视频为《SPSS统计学基础与实证研究应用精解》张甜 杨维忠著 清华大学出版社 一书的随书赠送视频讲解1.6节内容。本书已正式出版上市&#xff0c;当当、京东、淘宝等平台热销中&#xff0c;搜索书名即可。本书旨在手把手教会使…

【ROS2】使用C++实现简单的发布订阅方

1 构建自定义数据类型 1、自定义消息类型Student 1.1 创建base_interfaces_demo包 1.2 创建Student.msg文件 string name int32 age float64 height 1.2 在cmakeLists.txt中增加如下语句 #增加自定义消息类型的依赖 find_package(rosidl_default_generators REQUIRED) # 为…

操作系统复习篇

目录 一、引论 1.1、操作系统的目标 方便性&#xff1a; 有效性&#xff1a; 可扩充性&#xff1a; 开放性&#xff1a; 1.2、操作系统的作用 用户与计算机硬件系统之间的接口&#xff1a; 计算机系统资源的管理者&#xff1a; 实现对计算机资源的抽象&#xff1a; 1…

PDF结构详解

文章目录 介绍前言高保真的文件什么是PDF&#xff1f;PDF的一些优点版本摘要谁在使用PDF&#xff1f;有用的免费软件谁应该阅读 构建一个简单PDF文件基本PDF语法File StructureDocument ContentPage Content 构建简单PDF文件头目录&#xff0c;交叉引用表和文件尾主要对象图形内…

Github Copilot最全的安装与使用教程:一款非常好用的AI编程工具

Github Copilot最全的安装与使用教程 第一章 安装1.安装 GitHub Copilot2.获取资格第二章 使用1.产生建议1.1 键入你想要完成的操作的注释1.2 CtrlI 2. 接受建议3.查看下一个建议3.接受部分建议4.在新选项卡接受建议5.完成多项功能6.聊天 GitHub Copilot 供经过验证的学生、教师…

DAY7--learning english

一、积累 1.instinct Bro showed me his primal instinct 老兄给我展示他原始的本能&#xff08;返祖现象&#xff09;. 2. assembly Todays assembly is about part of journey. 今天的集会是讲述关于旅程的一部分。 3.aluminum Aluminum Casting Motocycle Engine Cover. …

java基础知识点系列——数据输入(五)

java基础知识点系列——数据输入&#xff08;五&#xff09; 数据输入概述 Scanner使用步骤 &#xff08;1&#xff09;导包 import java.util.Scanner&#xff08;2&#xff09;创建对象 Scanner sc new Scanner(System.in)&#xff08;3&#xff09;接收数据 int i sc…

数据结构.线性表(2)

一、模板 例子&#xff1a; a: b: 二、基本操作的实现 &#xff08;1&#xff09;初始化 &#xff08;2&#xff09;销毁和清空 &#xff08;3&#xff09;求长度和判断是否为空 &#xff08;4&#xff09;取值 &#xff08;5&#xff09;查找 &#xff08;6&#xff09;插入 &…

使用VS2015在win7 x64上编译调试FFmpeg(附源码和虚拟机下载)

1. 前言 在文章《使用VS2017在win10 x64上编译调试FFmpeg&#xff08;附源码和虚拟机下载&#xff09;》中&#xff0c;我们在win10VS2017的环境下基于开源项目ShiftMediaProject完成了FFmpeg源码调试环境的配置。在win7VS2015的环境下&#xff0c;ShiftMediaProject配置过程和…

机械工业品电商平台android

文章目录 一、开发组件和项目结构1、分包完成初始化工具1.1 开源库介绍&#xff1a;1.2 框架搭建1.3 自定义application&#xff0c;初始化网络加载库1.4 配置Glide1.5 网络请求响应对象1.6 工具类JSONUtil&Utils 二、环境配置1、将服务器的基地址&#xff0c;填到android&…

【算法练习】leetcode算法题合集之二分查找篇

二分查找 LeetCode69.x的平方根 LeetCode69.x的平方根 只要小于等于就可以满足条件了。 class Solution {public int mySqrt(int x) {int left 0, right x;int ans -1;while (left < right) {int mid (right - left) / 2 left;if ((long) mid * mid < x) {ans mi…

杨中科 EFCore 第二部分 实体配置

实体的配置 约定配置 主要规则: 1:表名采用DbContext中的对应的DbSet的属性名。 2:数据表列的名字采用实体类属性的名字&#xff0c;列的数据类型采用和实体类属性类型最兼容的类型。 3:数据表列的可空性取决于对应实体类属性的可空性。 4:名字为Id的属性为主键&#xff0c;如…

isis实验

根据要求制作大概&#xff1a; 使用isis配置路由器&#xff1a; 配置好物理接口地址后配置isis 为实现r1访问r5的环回走r6,需要在r6上制作路由泄露&#xff1a; 在r5上产生r1的路由明细&#xff1a; 全网可达&#xff1a;

Linux学习之网络编程1(纯理论)

写在前面 刚刚更新完Linux系统编程&#xff0c;特别推荐大家去看的Linux系统编程&#xff0c;总共44个小时&#xff0c;老师讲的非常好&#xff0c;我是十天肝完的&#xff0c;每天大概看20集&#xff0c;每天还要以写blog的形式来写笔记来总结一下&#xff0c;虽然这十天有点…

Redis(概述、应用场景、线程模式、数据持久化、数据一致、事务、集群、哨兵、key过期策略、缓存穿透、击穿、雪崩)

目录 Redis概述 应用场景 Redis的线程模式 数据持久化 1.Rdb&#xff08;Redis DataBase&#xff09; 2.Aof&#xff08;Append Only File&#xff09; mysql与redis保持数据一致 redis事务 主从复制&#xff08;Redis集群) 哨兵模式 key过期策略 缓存穿透、击穿、…

Docker安装Jenkins,配置Maven和Java

前言 这是一个java的springboot项目&#xff0c;使用maven构建 安装准备 需要将maven和jdk安装在服务器上&#xff0c;Jenkins需要用到&#xff0c;还有创建一个jenkins的目录&#xff0c;安装命令如下&#xff1a; docker run -d -uroot -p 9095:8080 -p 50000:50000 --n…