RxJava介绍及基本原理

news2025/1/15 15:31:00

随着互联网的迅猛发展,Java已成为最广泛应用于后端开发的语言之一。而在处理异步操作和事件驱动编程方面,传统的Java多线程并不总是最佳选择。这时候,RxJava作为一个基于观察者模式、函数式编程和响应式编程理念的库,为我们提供了一种强大而灵活的解决方案。

简介

RxJava是 ReactiveX 家族的重要一员, ReactiveXReactive Extensions 的缩写,一般简写为 RxReactiveX官方给Rx的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口。
在这里插入图片描述

ReactiveX 不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了观察者模式,使你能够自由组合多个异步事件,而不需要去关心线程同步,线程安全并发数据以及I/O阻塞

RxJava在Java环境下使用,它通过Observable(可观测对象)和Subscriber(订阅者)来实现异步编程模型。Observable可以发射出一系列的数据流,而Subscriber则负责处理这些数据流。利用各种操作符,我们可以对数据流进行变换、过滤、合并等操作,从而完成复杂的异步任务。

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
在这里插入图片描述
结论: RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序的库;是一个 基于事件流、实现异步操作的库。

Observables · ReactiveX文档中文翻译

RxJava 是轻量级的

RxJava尽力做到非常轻巧。它仅关注Observable的抽象和与之相关的高层函数,实现为一个单独的JAR文件。

RxJava 是一个多语言实现

RxJava 支持Java 6或者更新的版本,以及其它的JVM语言如 Groovy, Clojure, JRuby, Kotlin 和 Scala。RxJava 可用于更多的语言环境,而不仅仅是Java和Scala,而且它致力于尊重每一种JVM语言的习惯。

RxJava 第三方库

下面是可与RxJava协作的第三方库:

  • Hystrix - 用于分布式系统的一个延时和容错处理框架

  • Camel RX - 一个用于Apache Camel 的 RxJava 兼容层

  • rxjava-http-tail - 让你可以跟踪HTTP日志,就像使用 tail -f 一样

  • mod-rxvertx - Extension for VertX - 使用 RxJava 封装的VertX库

  • rxjava-jdbc - 使用RxJava流式处理JDBC连接,还支持语句的函数式组合

  • rtree - 使用RxJava实现的一个纯内存的可变的R-tree和R*-tree

使用指南

你可以在Maven Central http://search.maven.org 找到用于Maven, Ivy, Gradle, SBT和其它构建工具需要的二进制文件和依赖信息.

Maven示例:

<dependency>
      <groupId>io.reactivex.rxjava3</groupId>
      <artifactId>rxjava</artifactId>
      <version>3.1.7</version>
</dependency>

RxJava使用三步曲

RxJava的使用可以概括为三个步骤:创建 Observable,定义 Observer 处理数据流,最后订阅(Subscribe)Observable。

创建 Observable

  • 可以直接使用 Observable.just() 方法来创建一个发射固定数据项的 Observable;

  • 也可以通过 Observable.fromIterable() 方法来创建包含多个数据项的 Observable。

Observable<String> observable = Observable.just("Hello", "World");

定义 Observer

创建一个 Observer 对象并实现它的各个方法。在这些方法中,你可以处理每个发射的数据项、对错误进行处理,或者在数据全部发射完毕时执行一些操作。

Observer<String> observer = new Observer<String>() {
	@Override
	public void onSubscribe(Disposable d) {
		// 在此方法中进行一些初始化操作或资源管理
	}

	@Override
	public void onNext(String s) {
		// 处理每个发射的数据项
		System.out.println(s);
	}

	@Override
	public void onError(Throwable e) {
		// 处理发生的异常情况
	}

	@Override
	public void onComplete() {
		// 完成所有的数据发射操作
	}
};

订阅 Observable

observable.subscribe(observer);

RxJava基本原理

生活例子引入

用一个生活例子引入,来介绍 RxJava的基本原理: 顾客到饭店吃饭
在这里插入图片描述

RxJava原理介绍

  • RxJava原理 基于 一种扩展的观察者模式

  • RxJava的扩展观察者模式中有4个角色:

角色作用类比
被观察者(Observable)产生事件顾客
观察者(Observer)接收事件,并给出响应动作厨房
订阅(Subscribe)连接 被观察者 & 观察者服务员
事件(Event)被观察者 & 观察者 沟通的载体

请结合上述 顾客到饭店吃饭 的生活例子理解:
在这里插入图片描述
RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。具体如下图:
在这里插入图片描述

代码实现

步骤1:创建被观察者 (**Observable**** )& 生产事件**

  • 即 顾客入饭店 - 坐下餐桌 - 点菜
// 步骤1:创建被观察者 (Observable )& 生产事件
// 即 顾客入饭店 - 坐下餐桌 - 点菜
// 1. 创建被观察者 Observable 对象
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
// create() 是 RxJava 最基本的创造事件序列的方法
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

// 2. 在复写的subscribe()里定义需要发送的事件
	@Override
	public void subscribe(ObservableEmitter<String> emitter) throws Exception {
		// 通过 ObservableEmitter类对象产生事件并通知观察者
		// ObservableEmitter类介绍
		// 2.1 定义:事件发射器
		// 2.2 作用:定义需要发送的事件 & 向观察者发送事件
		emitter.onNext("event01");
		emitter.onNext("event02");
		emitter.onNext("event03");
		emitter.onComplete();
	}
});

步骤2:创建观察者 (**Observer**** )并 定义响应事件的行为**

  • 即 开厨房 - 确定对应菜式

  • 发生的事件类型包括:Next事件、Complete事件 & Error事件。具体如下:

事件类型定义作用使用规则使用方法
Next普通事件向观察者发送需要响应事件的信号被观聚者可发送无限个Next事件;观察者可接受无限个Next事件onNext()
Complete表示所有的事件都已经成功完成(RxJava把所有时间当作队列处理)标志 被观察者 不再发送普通事件(Next)当被观察者发送了一个Complete事件后,被观察者在Complete事件后的事件将会继续发送,但观察者收到Complete事件后将不再继续接收任何事件;被观察者可以不发送Complete事件。onComplete()
Error事件队列异常事件标志 事件处理过程中出现异常(此时队列自动终止,不允许再有事件发出)当被观察者发送了一个Error事件后,被观察者在Error事件后的事件将会继续发送,但观察者收到Error事件后将不再继续接收任何事件;被观察者可以不发送Error事件。onError()
// 1. 创建观察者 (Observer )对象
Observer<String> observer = new Observer<String>() {
	// 2. 创建对象时通过对应复写对应事件方法 从而 响应对应事件
	// 观察者接收事件前,默认最先调用复写 onSubscribe()
	@Override
	public void onSubscribe(Disposable d) {

	}
	// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
	@Override
	public void onNext(String value) {
		System.out.println("对Next事件作出响应" + value);
	}
	// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
	@Override
	public void onError(Throwable e) {

	}
	// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
	@Override
	public void onComplete() {

	}
};

步骤3:通过订阅(**Subscribe**)连接观察者和被观察者

  • 即 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
observable.subscribe(observer);
// 或者 observable.subscribe(subscriber);

Subject

来看⼀个⾮常特殊的类型- Subject ,为什么说它特殊呢?原因很简单:它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的–就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。

Subject的种类

针对不同的场景一共有四种类型的Subject。他们并不是在所有的实现中全部都存在,而且一些实现使用其它的命名约定(例如,在RxScala中Subject被称作PublishSubject)。

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。

AsyncSubject asyncSubject = AsyncSubject.create();
// 发送事件
asyncSubject.onNext(1);
// 订阅
asyncSubject.subscribe(event -> {
	System.out.println(event);
});
asyncSubject.onNext(3);
// 再次发送事件
asyncSubject.onNext(4);
asyncSubject.onComplete();
// 只会监听到 事件4

在这里插入图片描述
PublishSubject

可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。

// 初始化⼀个PublishSubject
PublishSubject publishSubject = PublishSubject.create();
// 发送事件
publishSubject.onNext(1);
// 订阅
publishSubject.subscribe(event -> {
	System.out.println(event);
});
// 再次发送事件
publishSubject.onNext(2);
publishSubject.onNext(3);
  • 事件1是无法被订阅的,只接受订阅之后的响应
    在这里插入图片描述
    BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1080088.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Nuget】程序包源

程序包源地址(部分) Azure 中国区的官方 NuGet 程序包源地址 https://nuget.cdn.azure.cn/v3/index.json 官方 NuGet 程序包源地址 V2 https://www.nuget.org/api/v2 官方 NuGet 程序包源地址 V3 https://api.nuget.org/v3/index.json MyGet 上 Eto.Forms 框架的程序包源地址 h…

杨冰:分布式数据库助力企业数实融合,跨越数字化转型深水区

近日&#xff0c;2023 inclusion外滩大会在上海黄浦世博园区举办。由赛迪顾问与 OceanBase 联合主办的外滩大会“分布式数据库助力数实融合”见解论坛圆满落幕。 会上&#xff0c;OceanBase CEO 杨冰发表了《分布式数据库助力企业数实融合&#xff0c;跨越数字化转型深水区》的…

一个完整的初学者指南Django-part1

源自&#xff1a;https://simpleisbetterthancomplex.com/series/2017/09/04/a-complete-beginners-guide-to-django-part-1.html 一个完整的初学者指南Django - 第1部分 介绍 今天我将开始一个关于 Django 基础知识的新系列教程。这是一个完整的 Django 初学者指南。材料分为七…

mysql面试题38:count(1)、count(*) 与 count(列名) 的区别

该文章专注于面试&#xff0c;面试只要回答关键点即可&#xff0c;不需要对框架有非常深入的回答&#xff0c;如果你想应付面试&#xff0c;是足够了&#xff0c;抓住关键点 面试官&#xff1a; count(1)、count(*) 与 count(列名) 的区别 当使用COUNT函数进行数据统计时&…

echarts折线图(其他图也是一样)设置tooltip自动滚动

按顺序自动滚动效果 <div class"leftComp-charts" id"chartsBox"></div>chartsData: {roadNorm: [],time: []},eChartsTimer: nullinitChartsBox() {this.option {tooltip: {trigger: "axis",axisPointer: {// 方法一type: "s…

2023年中国稻谷加工机械分类、市场规模及发展前景分析[图]

稻谷加工机械设备主要包括砻谷机、碾米机、抛光机、碎米机和砻糠机&#xff1b;通过物理和机械方式将稻谷加工成可供人们食用的大米&#xff0c;同时还可以提取出一些有价值的副产品&#xff0c;如砻糠可以用作饲料。 稻谷加工机械制造行业分类 资料来源&#xff1a;共研产业咨…

设计模式-相关内容

文章目录 一、设计模式概述二、UML图1.类的表示方法2.类与类之间关系的表示方法(1)关联关系(2)聚合关系(3)组合关系(4)依赖关系(5)继承关系(6)实现关系 三、软件设计原则1.开闭原则2.里氏代换原则3.依赖倒转原则4.接口隔离原则5.合成复用原则6.迪米特法则 一、设计模式概述 创…

集群分发脚本xysnc

一、scp&#xff08;secure copy&#xff09; 安全拷贝 1.定义 scp&#xff08;Secure Copy&#xff09;是一个用于在不同计算机之间安全地复制文件和目录的命令行工具。它使用 SSH 协议进行连接和文件传输&#xff0c;提供了加密和身份验证机制&#xff0c;确保数据传输的安…

Android 项目增加 res配置

main.res.srcDirs "src/main/res_test" build->android->sourceSets

从裸机启动开始运行一个C++程序(七)

前序文章请看&#xff1a; 从裸机启动开始运行一个C程序&#xff08;六&#xff09; 从裸机启动开始运行一个C程序&#xff08;五&#xff09; 从裸机启动开始运行一个C程序&#xff08;四&#xff09; 从裸机启动开始运行一个C程序&#xff08;三&#xff09; 从裸机启动开始运…

Mall脚手架总结(四) —— SpringBoot整合RabbitMQ实现超时订单处理

前言 在电商项目中&#xff0c;订单因为某种特殊情况被取消或者超时未支付都是比较常规的用户行为&#xff0c;而实现该功能我们就要借助消息中间件来为我们维护这么一个消息队列。在mall脚手架中选择了RabbitMQ消息中间件&#xff0c;接下来荔枝就会根据功能需求来梳理一下超时…

SRE实战:如何低成本推进风险治理?稳定性与架构优化的3个策略

一分钟精华速览 SRE 团队每天面临着不可控的各类风险和重复发生的琐事&#xff0c;故障时疲于奔命忙于救火。作为技术管理者&#xff0c;你一直担心这些琐事会像滚雪球一样&#xff0c;越来越多地、无止尽地消耗你的团队&#xff0c;进而思考如何系统性地枚举、掌控这些风险&a…

ctf中ping命令执行绕过

相关wp参考&#xff1a;CTF中的命令执行绕过方式 - 知乎 CTFping命令绕过及符号用法_ctf ping-CSDN博客 在用linux命令时候,我们可以 一行执行多条命令 或者 有条件的执行下一条命令 linux命令中一些符号的用法 1. “;”分号用法 方式&#xff1a;command1 ; command…

【ccf-csp题解】第7次csp认证-第三题-路径解析超详细题解-字符串模拟

本题思路来源于acwing ccfcsp认证课 题目描述 思路分析 首先&#xff0c;为了处理路径中的反斜杠符号&#xff0c;我们可以实现一个get函数&#xff0c;把一个路径中每一对反斜杠之间的内容存到vector<string>中&#xff0c;如果有连续的多个反斜杠则只看成一个 举个例…

“.NET视频总结:认识框架的结构和组件,掌握开发工具的奥妙“一

目录 第一单元&#xff1a;二十一世纪程序执行 背景: 总结&#xff1a; 第二单元:对象导向与类别设计 背景: 总结&#xff1a; 第三单元&#xff1a;使用类别与基底类别库 总结: 第四单元:Windows开发程序 背景: 总结: 第五单元:防护式程序设计 背景: 总结: 第六…

数据库中的DECODE函数,SIGN函数

oracle中的if(),oracle中if/else的三种实现方式详解_电竞GO的博客-CSDN博客 DECODE(CONCAT(b.AZZ231,%),%,,CONCAT(b.AZZ231,%)) czwcl,

Acrel-6000电气火灾监控系统应用

安科瑞 崔丽洁 摘要 建筑电气火灾在建筑物火灾中占较大的比例&#xff0c;起火原因也很多&#xff0c;包括短路、过热、漏电、雷击和电气等故障&#xff0c;火灾危害也较大。因此&#xff0c;各种原因引起的火灾都应得到有效控制。目前&#xff0c;短路、过热、雷击等保护措施…

【Java网络编程】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 前言 Java是一种广泛应用于网络编程的编程语言。通过Java的网络编程能力&#xff0c;我们可以构建强大的网络应用程序。本文将介绍Java网络编程的基础知识、常用API和一些实…

nginx目录穿越

测试nginx版本为nginx/1.23.3 location /file {alias /home/;} 在/usr跟目录下新建a.txt测试文件 通过访问 http://{ip}:{端口}/file../test.txt 实现目录穿越 防护:location与alias的值都加上/或不加/

MongoDB实践

MongoDB学习 MongoDB简介 MongoDB 是一种流行的文档型 NoSQL 数据库&#xff0c;它具有以下特点和应用场景&#xff1a; 文档型数据库&#xff1a;MongoDB 使用 BSON&#xff08;Binary JSON&#xff09;格式的文档来存储数据。每个文档可以具有不同的字段&#xff0c;这使得…