RxJava异步编程初探

news2025/1/19 11:25:48

                      


RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。

RxJava 有以下三个基本的元素:

  1. 被观察者(Observable)
  2. 观察者(Observer)
  3. 订阅(subscribe)

下面来说说以上三者是如何协作的:

被观察者发送的事件有以下几种,总结如下表:


 

移入Rx2Java2jar包,该jar包让我们异步编程变得更加简单
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

撸点代码运行看看概况:

package org.jd.data.netty.big.window.chat.reactor;

import com.sun.istack.internal.NotNull;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

/**
 * Cold的生产者一个对应多个消费者:
 * 消息发送者与各个订阅者之间是独立的,并不是共享生产者发送的消息,
 * 而是单独给每个订阅者从新参数,所有说,订阅者收到的消息不是相同的
 * 只有观察者订阅了,才开始发送数据
 */
public class ColdObservableOperation {
    // 创建一个订阅者
    static Consumer<Long> subscriber1 = new Consumer<Long>() {

        @Override
        public void accept(Long aLong) {
            System.out.println(" 订阅者:subscriber1 receive message : " + aLong);
        }
    };
    // 创建一个订阅者
    static Consumer<Long> subscriber2 = new Consumer<Long>() {
        @Override
        public void accept(Long message) throws Exception {
            System.out.println("         订阅者:subscriber2 receive message : " + message);
        }
    };

    public static void main(String args[]) {
        // 被观察者
        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            // 被观察者发射器
            @Override
            public void subscribe(@NotNull ObservableEmitter<Long> observableEmitter) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(observableEmitter::onNext); // 每个10毫秒调用发送一次数据

            }
        }).observeOn(Schedulers.newThread());
        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            // 主线程等100毫秒
            Thread.sleep(10000000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}

package org.jd.data.netty.big.window.chat.reactor;

import io.reactivex.annotations.NonNull;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * 在RxJava中,被观察者,观察者,Subscribe()三者缺一不可
 * 只有使用了subscribe(),被观察者才会开始发送数据,这一点比较重要
 */
public class RxJava2ReactiveHelloWorld {
    /**
     * 观察输出的顺序
     * @param args
     */
    public static void main(String args[]) {
        // 被观察者
        Observable.just("你好,杨哥欢迎来到RxJava2的异步编程世界,这里你将学到关于RxJava2的编程思想!")
                .subscribe(new Observer<String>() { // 该观察者有四个方法

                    @Override
                    public void onSubscribe(@NonNull Disposable disposable) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        System.out.println(s);
                    }

                    @Override
                    public void onError(@NonNull Throwable throwable) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
    }
}

package org.jd.data.netty.big.window.chat.reactor;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;

/**
 * RxJava2入门HelloWord版本
 */
public class RxJava2HelloWorld {
    public static void main(String args[]) {
        simpleLambdaObservable();
        System.out.println("===========================Lambda版本===================================");
        primitiveRxJava2Observable();
        System.out.println("=============================简化版本=================================");
        simpleSimplifyRxJava2Observable();

    }

    /**
     * Lambda表达式简化版本
     */
    private static void simpleLambdaObservable() {
        Observable.create((ObservableOnSubscribe<String>) observableEmitter -> observableEmitter.onNext("Hello World!"))
                .subscribe(System.out::println); // 订阅发射器发射的内容
    }

    /**
     * 原始版本,便于理解
     */
    private static void primitiveRxJava2Observable() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("Hello World!");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

    /**
     * 简化的版本:
     * RxJava是一种新的编程思想,为我们异步编程由此变得更加简单
     */
    public static void simpleSimplifyRxJava2Observable() {
        Observable.just("Hello World !").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.err.println(s);
            }
        });
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/821550.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

prometheus+grafana进行服务器资源监控

在性能测试中&#xff0c;服务器资源是值得关注一项内容&#xff0c;目前&#xff0c;市面上已经有很多的服务器资 源监控方法和各种不同的监控工具&#xff0c;方便在各个项目中使用。 但是&#xff0c;在性能测试中&#xff0c;究竟哪些指标值得被关注呢&#xff1f; 监控有…

SqlSugar、Freesql、Dos.ORM、EF、四种ORM框架的对比

SqlSugar、Freesql、Dos.ORM、EF、四种ORM框架的对比 一、默认情况下,导航属性是延迟查询; 答:ORM(Object-relational mapping)即对象关系映射,是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。也就是说,ORM是通过使用描述对象和数据库之间映射的元数据…

线程状态

从卖包子的案例学习进程间的通信 public class Test {public static void main(String[] args) {Object objnew Object();Thread th1new Thread(){Overridepublic void run() {synchronized (obj){System.out.println("来三个包子&#xff01;");try {obj.wait(); /…

IDEA删除本地git仓库、创建本地git仓库、关联其他仓库并上传

IDEA删除本地git仓库、创建本地git仓库、关联其他仓库并上传 删除本地Git仓库 创建本地Git仓库 关联其他仓库并上传 要在IntelliJ IDEA中删除本地Git仓库并创建新的本地Git仓库&#xff0c;以及关联其他仓库并上传&#xff0c;请按照以下步骤进行操作&#xff1a; 删除本地G…

【笔记】数据结构与算法 python-03-列表查找

列表查找 在一个数据结构中&#xff0c;通过一定的方法找出与给定关键字相同的数据元素的过程。 列表查找&#xff08;线性表查找&#xff09;&#xff1a;从列表&#xff08;一种线性数据结构&#xff0c;元素按照一定的顺序存储&#xff0c;每个元素都有一个唯一的位置索引…

网络出口技术中的单一出口网络结构,你会用吗?

我们在设计一个园区网络的时候&#xff0c;园区网络的出口需要和运营商的网络进行对接&#xff0c;从而提供internet服务。 在和运营商网络对接的时候&#xff0c;一般采用如下3终方式&#xff1a; 单一出口网络结构 1、网络拓扑 终端用户接入到交换机&#xff0c;交换机直…

干货 ,ChatGPT 4.0插件Review Reader,秒杀一切选品神器

Hi! 大家好&#xff0c;我是专注于AI项目实战的赤辰&#xff0c;今天继续跟大家介绍另外一款GPT4.0插件Review Reader&#xff08;评论阅读器&#xff09;。 做电商领域的小伙伴们&#xff0c;都知道选品分析至关重要&#xff0c;可以说选品决定成败&#xff0c;它直接关系到产…

MySQL用通配符过滤数据

简单的不使用通配符过滤数据的方式使用的值都是已知的&#xff0c;但是当搜索产品名中包含ashui的所有产品时&#xff0c;用简单的比较操作符肯定不行&#xff0c;必须使用通配符。利用通配符可以创建比较特定数据的搜索模式。 通配符&#xff1a;用来匹配值的一部分的特殊字符…

【数据分享】1999—2021年地级市市政公用事业和邮政、电信业发展情况相关指标(Excel/Shp格式)

在之前的文章中&#xff0c;我们分享过基于2000-2022年《中国城市统计年鉴》整理的1999-2021年地级市的人口相关数据、各类用地面积数据、污染物排放和环境治理相关数据、房地产投资情况和商品房销售面积、社会消费品零售总额和年末金融机构存贷款余额、地方一般公共预算收支状…

减轻 PWM 的滤波要求

经典脉宽调制器 (PWM) 发出 H 个连续逻辑高电平&#xff08;1&#xff09;&#xff0c;后跟 L 个连续逻辑低电平&#xff08;0&#xff09;的重复序列。每个高电平和低电平持续一个时钟周期 T 1/F (Hz)。结果的占空比可定义为 H/N&#xff0c;其中 N HL 时钟周期。N 通常是 2…

keil固件库的安装 库函数的配置

文章目录&#xff1a; 第一步&#xff1a;下载固件库文件 第二步&#xff1a;创建一个新的文件夹&#xff0c;用来保存固件库文件。在该文件夹下新建文件夹&#xff1a;CMSIS、Lib、Startup、User 第三步&#xff1a;把库文件中文件放入我们建立对应的文件中 1.CMSIS模块 …

【MIPI协议 C-PHY详解】

MIPI协议 C-PHY详解 前言一、C-PHY介绍1.1 C-PHY 与 D-PHY wire的区别1.2 3 wire的状态&#xff08;states&#xff09;变化1.3 C-PHY Data Mapping Between 7 Symbols and a 16-Bit Data1.3 C-PHY Lane States and Line Levels ~ LP Mode 二、C-PHY LP Package Format2.1 C-PH…

SQLI_LABS攻击

目录 Less1 首先来爆字段 联合注入 判断注入点 爆数据库名 爆破表名 information_schema information_schmea.tables group_concat() 爆破列名 information_schema.columns 爆值 SQLMAP Less-2 -4 Less -5 布尔 数据库 表名 字段名 爆破值 SQLMAP Less-6 …

​LeetCode解法汇总142. 环形链表 II

目录链接&#xff1a; 力扣编程题-解法汇总_分享记录-CSDN博客 GitHub同步刷题项目&#xff1a; https://github.com/September26/java-algorithms 原题链接&#xff1a; 力扣 描述&#xff1a; 给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如…

ssl单向证书和双向证书校验测试及搭建流程

零、前提准备 说明&#xff1a; 50.50.1.118作为服务端&#xff0c;系统是 linux&#xff0c;openssl版本是&#xff1a;OpenSSL 1.1.1f 31 Mar 2020。 50.50.1.116是客户端&#xff0c;系统是Windows&#xff0c;openssl版本是&#xff1a;OpenSSL 3.0.5 5 Jul 2022 (Library…

Day07-作业(MySQL查询设计)

作业1: 根据如下需求完成SQL语句的编写 【仔细阅读题目需求】 数据准备&#xff1a; 创建一个数据库 db02_homework 执行如下SQL语句&#xff0c;准备测试数据 -- 员工管理(带约束) create table tb_emp (id int unsigned primary key auto_increment comment ID,username …

这款轻量级规则引擎,真香!

大家好&#xff0c;我是老三&#xff0c;之前同事用了一款轻量级的规则引擎脚本AviatorScript&#xff0c;老三也跟着用了起来&#xff0c;真的挺香&#xff0c;能少写很多代码。这期就给大家介绍一下这款规则引擎。 简介 AviatorScript 是一门高性能、轻量级寄宿于 JVM &…

阿里云率先荣获容器集群稳定性先进级认证

7 月 25 日&#xff0c;由中国信通院发起的“2023 稳保体系”评估结果在可信云大会现场公布&#xff0c;阿里云容器服务 ACK 成为首批通过“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c;并荣获“先进级”认证。 云原生技术正在激活应用构建新范式&#xff0c;构…

数据结构初阶--树和二叉树的概念与结构

目录 一.树 1.1.树的概念 1.2.树的相关概念 1.3.树的表示 1.4.树在实际中的运用 二.二叉树 2.1.二叉树的概念 2.2.特殊的二叉树 满二叉树 完全二叉树 2.3.二叉树的性质 2.4.二叉树的存储结构 顺序存储 链式存储 一.树 1.1.树的概念 树是一种非线性的数据结构&am…

【SVN】merge 合并trunk branch代码,解决冲突

在命令行模式下进入待merge的项目根目录 1.将指定url上的代码merge到本地当前文件夹下&#xff08;--dry-run表示test merge&#xff09; E:\project\ry\trunk\ees-tem>svn merge http://192.168.0.2/svn/repo/ProD/JinZay/EE S/code/ees-tem/branches/develop -c 1149491 …