响应式编程(Reactive Programming)介绍

news2025/1/11 15:02:38

什么是响应式编程?

在互联网上有着一大堆糟糕的解释与定义。Wikipedia 一如既往的空泛与理论化。Stackoverflow 的权威答案明显不适合初学者。Reactive Manifesto 看起来是你展示给你公司的项目经理或者老板们看的东西。微软的 Rx terminology"Rx = Observables + LINQ + Schedulers" 过于重量级且微软味十足,只会让大部分人困惑。相对于你所使用的 MV* 框架以及钟爱的编程语言,“Reactive” 和 “Propagation of change” 这些术语并没有传达任何有意义的概念。框架的 Views 层当然要对 Models 层作出反应,改变当然会传播。如果没有这些,就没有东西会被渲染了。

所以不要再扯这些废话了。

响应式编程是使用异步数据流进行编程

一方面,这并不是什么新东西。Event buses 或者 Click events 本质上就是异步事件流,你可以监听并处理这些事件。响应式编程的思路大概如下:你可以用包括 Click 和 Hover 事件在内的任何东西创建 Data stream。Stream 廉价且常见,任何东西都可以是一个 Stream:变量、用户输入、属性、Cache、数据结构等等。举个例子,想像一下你的 Twitter feed 就像是 Click events 那样的 Data stream,你可以监听它并相应的作出响应。

在这个基础上,你还有令人惊艳的函数去组合、创建、过滤这些 Streams,这就是函数式魔法的用武之地。Stream 能接受一个,甚至多个 Stream 为输入,你可以融合两个 Stream,也可以从一个 Stream 中过滤出你感兴趣的 Events 以生成一个新的 Stream,还可以把一个 Stream 中的数据值 映射到一个新的 Stream 中。

既然 Stream 在响应式编程中如此重要,那么我们就应该好好的了解它们,就从我们熟悉的"Clicks on a button" Event stream 开始。

在这里插入图片描述
Stream 就是一个按时间排序的 Events 序列,它可以放射三种不同的 Events:(某种类型的)Value、Error 或者一个" Completed" Signal。考虑一下"Completed"发生的时机,例如,当包含这个按钮的窗口或者视图被关闭时。

通过分别为 Value、Error、“Completed"定义事件处理函数,我们将会异步地捕获这些 Events。有时可以忽略 Error 与"Completed”,你只需要定义 Value 的事件处理函数就行。监听一个 Stream 也被称作是订阅 ,而我们所定义的函数就是观察者,Stream则是被观察者,其实就是 Observer Design Pattern。

上面的示意图也可以使用ASCII重画为下图,在下面的部分教程中我们会使用这幅图:

--a---b-c---d---X---|->

a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline 

既然已经开始对响应式编程感到熟悉,为了不让你觉得无聊,我们可以尝试做一些新东西:我们将会把一个 Click event stream 转为新的 Click event stream。

首先,让我们做一个能记录一个按钮点击了多少次的计数器 Stream。在常见的响应式编程库中,每个Stream都会有多个方法,如 map, filter, scan, 等等。当你调用其中一个方法时,例如 clickStream.map(f),它就会基于原来的 Click stream 返回一个新的 Stream 。它不会对原来的 Click steam 作任何修改。这个特性称为不可变性,它对于响应式编程 Stream,就如果汁对于薄煎饼。我们也可以对方法进行链式调用,如 clickStream.map(f).scan(g):

clickStream: ---c----c--c----c------c-->
           vvvvv map(c becomes 1) vvvv
           ---1----1--1----1------1-->
           vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5--> 

map(f) 会根据你提供的 f 函数把原 Stream 中的 Value 分别映射到新的 Stream 中。在我们的例子中,我们把每一次 Click 都映射为数字 1。scan(g) 会根据你提供的 g 函数把 Stream 中的所有 Value 聚合成一个 Value x = g(accumulated, current) ,这个示例中 g 只是一个简单的添加函数。然后,每 Click 一次, counterStream 就会把点击的总次数发给它的观察者。

为了展示响应式编程真正的实力,让我们假设你想得到一个包含“双击”事件的 Stream。为了让它更加有趣,假设我们想要的这个 Stream 要同时考虑三击(Triple clicks),或者更加宽泛,连击(两次或更多)。深呼吸一下,然后想像一下在传统的命令式且带状态的方式中你会怎么实现。我敢打赌代码会像一堆乱麻,并且会使用一些变量保存状态,同时也有一些计算时间间隔的代码。

而在响应式编程中,这个功能的实现就非常简单。事实上,这逻辑只有 4 行代码。但现在我们先不管那些代码。用图表的方式思考是理解怎样构建Stream的最好方法,无论你是初学者还是专家。

在这里插入图片描述
灰色的方框是用来转换 Stream 函数的。首先,简而言之,我们把连续 250 ms 内的 Click 都积累到一个列表中(就是 buffer(stream.throttle(250ms) 做的事。不要在意这些细节,我们只是展示一下响应式编程而已)。结果是一个列表的 Stream ,然后我们使用 map() 把每个列表映射为一个整数,即它的长度。最终,我们使用 filter(x >= 2) 把整数 1 给过滤掉。就这样,3 个操作就生成了我们想要的 Stream。然后我们就可以订阅(“监听”)这个 Stream,并以我们所希望的方式作出反应。

我希望你能感受到这个示例的优美之处。这个示例只是冰山一角:你可以把同样的操作应用到不同种类的 Stream 上,例如,一个 API 响应的 Stream;另一方面,还有很多其它可用的函数。

为什么我要使用响应式编程(RP)?

响应式编程提高了代码的抽象层级,所以你可以只关注定义了业务逻辑的那些相互依赖的事件,而非纠缠于大量的实现细节。RP 的代码往往会更加简明。

特别是在开发现在这些有着大量与数据事件相关的 UI events 的高互动性 Webapps、手机 apps 的时候,RP 的优势就更加明显。10年前,网页的交互就只是提交一个很长的表单到后端,而在前端只产生简单的渲染。Apps 就表现得更加的实时了:修改一个表单域就能自动地把修改后的值保存到后端,为一些内容"点赞"时,会实时的反应到其它在线用户那里等等。

现在的 Apps 有着大量各种各样的实时 Events,以给用户提供一个交互性较高的体验。我们需要工具去应对这个变化,而响应式编程就是一个答案。

响应式编程特点

传统的编程方式,是顺序执行的:上一个任务没有完成,需要等待,直至完成之后,才会执行下一个任务。无论是提升机器的性能还是代码的性能,本质上都需要依赖上一个任务的完成。如果需要响应迅速,就得把同步执行的方式换成异步,方法执行变成消息发送。这变成了异步编程的方式,它是响应式编程的重要特性之一。

  • 异步编程:提供了合适的异步编程模型,能够挖掘多核 CPU 的能力、提高效率、降低延迟和阻塞等。
  • 数据流:基于数据流模型,响应式编程提供一套统一的 Stream 风格的数据处理接口。和 Java 8 中的 Stream 相比,响应式编程除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
  • 变化传播:简单来说就是以一个「数据流」为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数(compose),将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。

响应式编程优点

响应式编程在执行过程中是异「步非阻塞」,可以充分利用多核 CPU 的性能,并且可以由底层的执行模型,负责通过数据流来自动传播变化,可以做到更实时的响应;使用响应式编程风格开发的程序,能支持更大的 吞吐量(备注:支持更大的吞吐量,不代表程序跑的更快;因为响应式编程只是让执行的主线程不会因为某些耗时操作而阻塞,导致其阻塞后面的请求,但实际执行耗时操作的子线程消耗的时间依然是没有变化)。

响应式编程缺点

  • 响应式编程风格,与传统的命令式编程风格差异大,且响应式编程主要是异步的,相较于同步的编程方式在思维上有所变化,需要一定的学习成本;
  • 因为其基于「事件」驱动,每次触发事件会使用新线程执行;如果在一个不会阻塞的程序中使用,线程切换可能比程序本身执行耗时要多;
  • 对于异常的处理,因为基于回调或者声明式的原因,在使用匿名回调时,没有寻址能力,意味着在整个数据处理链中,每个处理节点的处理成功或错误的状态,不会向外界发送相应信号,其中某个节点出错异常可能会被吞掉,或者进行了相关处理,但得到的异常信息少,不好定位。

具体实现

ReactiveX ,是响应式编程原则的一种实现,通过使用可观察序列,组成异步和基于事件的程序。使用 RX,您的代码创建并订阅名为 Observables 的数据流。虽然反应式编程是关于概念的,但 RX 为您提供了一个惊人的工具箱。通过结观察者和迭代器模式以及函数式习语,RX 为您提供了超能力。您拥有一系列功能来组合、合并、过滤、转换和创建数据流。

值得一提的是,ReactiveX 可以说无处不在,对前端、跨平台、后端都进行了适配,并提供相对应用的工具。在 Web 上可以使用 RxJS (A reactive programming library for JavaScript. 已有 27.5K Star),在移动设备上使用 Rx.NET 和 RxJava 操作 UI 事件和 API 响应。在 Java 平台,支持响应式编程的流行库有 Reactor , RxJava (Reactive Extensions for the JVM), Vert.x 等,而在Java9中,实现了响应式流规范(Reactive Streams)所定义的相关接口,让Java本身支持了响应式编程,不需要通过其他三方库实现。

此外,前端领域还有很多其他更进一步的实现,诸如 Cycle.js :用于可预测代码的功能性和反应式 JavaScript 框架。

RxJS:JS 的反应式编程库

RxJS JavaScript 的反应式(响应式)编程库,它通过使用可观察(Observable)序列,来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 Array 启发的操作符 (map、filter、reduce、every 等等),这些数组操作符可以把异步事件作为集合来处理。🉑️将 RxJS 视为事件的 Lodash。

基础知识

响应式代码的基本组成部分是Observables和Subscribers(事实上Observer才是最小的构建块,但实践中使用最多的是Subscriber,因为Subscriber才是和Observables的对应的。)。Observable发送消息,而Subscriber则用于消费消息。

消息的发送是有固定模式的。Observable可以发送任意数量的消息(包括空消息),当消息被成功处理或者出错时,流程结束。Observable会调用它的每个Subscriber的Subscriber.onNext()函数,并最终以Subscriber.onComplete()或者Subscriber.onError()结束。

这看起来像标准的观察者模式, 但不同的一个关键点是:Observables一般只有等到有Subscriber订阅它,才会开始发送消息(术语上讲就是热启动Observable和冷启动Observable。热启动Observable任何时候都会发送消息,即使没有任何观察者监听它。冷启动Observable只有在至少有一个订阅者的时候才会发送消息(我的例子中都是只有一个订阅者)。这个区别对于开始学习RxJava来说并不重要。)。换句话说,如果没有订阅者观察它,那么将不会起什么作用。

Hello, World!

让我们以一个具体例子来实际看看这个框架。首先,我们创建一个基本的Observable:

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

我们的Observable发送“Hello,world!”消息然后完成。现在让我们创建Subscriber来消费这个数据:

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

上面代码所做的工作就是打印由Observable发送的字符串。现在我们有了myObservable和mySubscriber,就可以通过subscribe()函数把两者关联起来:

myObservable.subscribe(mySubscriber);
// Outputs "Hello, world!"

当订阅完成,myObservable将调用subscriber的onNext()和onComplete()函数,最终mySubscriber打印“Hello, world!”然后终止。

RxJava2 的核心原理

1、简单的链式调用(无线程切换)

先来看一段示例代码:

Observable.create(object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                Log.d("solart", "subscribe > ${Thread.currentThread().name}")
                emitter.onNext("test")
                emitter.onComplete()
            }
        }).flatMap(object : Function<String, Observable<String>> {
            override fun apply(t: String): Observable<String> {
                return Observable.just(t)
            }
        }).map(object : Function<String, Int> {
            override fun apply(t: String): Int {
                return 0
            }
        }).subscribe(object : Observer<Int> {
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }
        })

这段代码中我们简单用了 create、flatMap、map等操作符,进行了流式的数据转换,最后我们通过 subscribe 订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图

在这里插入图片描述

2、异步事件流编程(线程切换)

相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。

同样的我们还是先给出一个简单的示例:

Observable.create(object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    Log.d("solart", "subscribe >  ${Thread.currentThread().name}")
                    emitter.onNext("test")
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.io())
                .map(object : Function<String, Int> {
                override fun apply(t: String): Int {
                    return 0
                }
            }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer<Int> {
               
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }

            })

这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:

在这里插入图片描述
图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WebGPU学习(1)---在WebGPU上绘制三角形

在本文中&#xff0c;我们将使用 WebGPU 绘制一个简单的三角形。示例地址 初始化 WebGPU WebGPU 初始化的流程比 WebGL 要更复杂。 在 WebGL 中&#xff0c;我们只需从 Canvas 元素获取 WebGL 渲染上下文&#xff0c;如 getContext(“webgl” 或者 “webgl2”)。 const gl …

浙江工业大学关于2023年MBA考试初试成绩查询及复试阶段说明

根据往年的情况&#xff0c;2023浙江工业大学MBA考试初试成绩可能将于2月21日公布&#xff0c;为了广大考生可以及时查询到自己的分数&#xff0c;杭州达立易考教育为大家汇总了信息。 1、初试成绩查询&#xff1a;考生可登录中国研究生招生信息网“全国硕士研究生招生考…

Redis学习【9】之Redis RDB持久化

文章目录一 AOF(Append Only File) 持久化二 AOF 基础配置2.1 AOF的开启2.2 文件名配置2.3 混合式持久化开启2.4 AOF 文件目录配置三 AOF 文件格式3.1 Redis 协议3.2 查看 AOF 文件3.3 清单文件3.4 Rewrite 机制3.4.1 rewrite简介3.4.2 rewrite 计算策略3.4.3 手动开启 rewrite…

极客时间左耳听风-高效学习

左耳听风——高效学习篇 P95 | 高效学习&#xff1a;端正学习态度 本人真实⬇️⬇️⬇️⬇️ “ 大部分人都认为自己爱学习&#xff0c;但是&#xff1a; 他们都是只有意识没有行动&#xff0c;他们是动力不足的人。 他们都不知道自己该学什么&#xff0c;他们缺乏方向和目标。…

基于RK3588的嵌入式linux系统开发(四)——uboot镜像下载(基于RKDevTool工具)

官方提供的SDK中包含RKDevTool工具&#xff08;RKDevTool_Release_v2.92&#xff09;和相应的驱动&#xff08;DriverAssitant_v5.1.1&#xff09;。本节主要介绍在windows操作系统环境下利用RKDevTool下载以上生成的uboot镜像和bootloader镜像。注意&#xff1a;本节使用的板卡…

什么是Type-c口?Type-c口有什么优势?

什么是Type-C接口 Type-C接口有哪些好处坏处 说起“Type-C”&#xff0c;相信大家都不会陌生&#xff0c;因为最近拿它大做文章的厂商着实不少&#xff0c;但要具体说清楚Type-C是什么&#xff0c;估计不少人只能说出“可以正反插”“USB的一种”之类的大概。其实&#xff0c;T…

JavaEE|网络编程基础与Socket套接字

文章目录一、为什么需要网络编程二、什么是网络编程三、网络编程中的基本概念1.发送端和接收端2.请求和响应3.客户端和服务端4.常见的客户端服务端模型四、Socket套接字概念及分类1.概念2.分类1&#xff09;流套接字&#xff1a;使用传输层TCP协议2&#xff09;数据报套接字&am…

LeetCode 430. 扁平化多级双向链表

原题链接 难度&#xff1a;middle\color{orange}{middle}middle 题目描述 你会得到一个双链表&#xff0c;其中包含的节点有一个下一个指针、一个前一个指针和一个额外的 子指针 。这个子指针可能指向一个单独的双向链表&#xff0c;也包含这些特殊的节点。这些子列表可以有一…

2023年前端面试知识点总结(JavaScript篇)

近期整理了一下高频的前端面试题&#xff0c;分享给大家一起来学习。如有问题&#xff0c;欢迎指正&#xff01; 1. JavaScript有哪些数据类型 总共有8种数据类型&#xff0c;分别是Undefined、Null、Boolean、Number、String、Object、Symbol、BigInt Null 代表的含义是空对象…

蓝图通讯之事件分发器用法

在事件分发器点 加号添加一个分发器 image.png在蓝图开始运行就进行绑定这个事件, image.png他会创建一个以分发器开头_事件的东西. 而绑定类似,只是没有创建和连接 image.pngimage.png然后 我们设置一个键触发这个逻辑. image.png最后验证多次输入多次触发,类似编程中的事件订…

【博客626】不同类型的ARP报文作用以及ARP老化机制

不同类型的ARP报文作用以及ARP老化机制 1、ARP协议及报文 2、不同类型的ARP报文作用 3、ARP工作原理 4、ARP老化机制 5、Linux ARP老化机制 ARP状态机&#xff1a; 在上图中&#xff0c;我们看到只有arp缓存项的reachable状态对于外发包是可用的&#xff0c;对于stale状态的…

excel应用技巧:F功能键诸多应用汇总

F1&#xff5e;F12&#xff0c;个个都是Excel操作的好帮手。单单一个F4键就有多种用法&#xff0c;其中第4种等间距复制图形&#xff0c;真的很神奇呀。利用快捷键可以代替鼠标做一些工作&#xff0c;也可以实实在在提高我们日常的工作效率&#xff0c;今天就先来跟大家一起分享…

数据结构与算法—队列

队列 队列介绍 有序列表&#xff0c;可以用数组或者链表实现。遵循先进先出原则。 数组实现队列 public class ArrayQueue {public static void main(String[] args) {ArrayQueue queue new ArrayQueue(3);// 接收用户输入char key ;Scanner sc new Scanner(System.in);…

PyQt5 自定义富文本编辑器

介绍 一款使用PyQt5和网页端框架wangEditor集成的富文本编辑器 代码片段 PyQt5客户端 与网页端建立连接def create_connect(self):self.web_view QWebEngineView()self.bridge JSBridge(self.web_view.page())self.web_view.load(QUrl.fromLocalFile(self.editor_path))w…

现代卷积神经网络经典架构图

卷积神经网络&#xff08;LeNet&#xff09; LeNet 的简化版深层卷积神经网络&#xff08;AlexNet&#xff09; 从LeNet&#xff08;左&#xff09;到AlexNet&#xff08;右&#xff09;改进&#xff1a; dropOut层 - 不改变期望但是改变方差ReLU层 - 减缓梯度消失MaxPooling数…

2.18 设置language和中文输入法

文章目录一&#xff1a;设置language二&#xff1a;设置中文输入法一&#xff1a;设置language nvidia的开发板上默认只有English&#xff0c;需要点击如下管理&#xff1a; 接着进入如下界面&#xff1a; 此时图中的“汉语&#xff08;中国&#xff09;”应该是没有的&…

Kubernetes是个什么东东?

Kubernetes 是一个可移植、可扩展的开源平台&#xff0c;用于管理容器化的工作负载和服务&#xff0c;可促进声明式配置和自动化。 Kubernetes 拥有一个庞大且快速增长的生态&#xff0c;其服务、支持和工具的使用范围相当广泛。 Kubernetes 这个名字源于希腊语&#xff0c;意…

NoMachine 输入用户名密码后 闪断 解决办法

大家好&#xff0c;我是虎哥&#xff0c;最近工作忙&#xff0c;好长时间没有继续套件的深度学习&#xff0c;今天周六&#xff0c;难得有空&#xff0c;泡好茶&#xff0c;打开电脑&#xff0c;链接套件桌面&#xff0c;得&#xff0c;出问题了&#xff0c;一个很奇怪的问题&a…

[教你传话,表白,写信]

第一步 关注飞鸽传话助手 第二部 点击链接进入 第三步 点击发送,输入内容 第四步 就可以收到了

Simulink 自动代码生成电机控制:STM32 Encoder编码器使用总结

目录 Encoder 原理 STM32 Encoder 计数原理 模型仿真 模拟Encoder 基于Encoder计算角度和速度 关于启动的仿真 代码生成 运行演示 总结 总结一下基于STM32的Encoder接口的电机运行&#xff0c;相应的仿真和实验都是基于一个1024脉冲的增量式光电编码器&#xff0c;关于…