Java并发编程之ReactiveSteams

news2024/9/21 16:16:16

Reactive Steams

一、Reactive Steams介绍

在聊Reactive Steams之前,先了解一下Reactive Programming(反应式/响应式编程)。为了解决异步编程中出现的各种问题,程序员们提出了各种的思路去解决这些问题,这些解决问题的方式,方法,手段就可以叫做Reactive Programming。

Reactive Programming是一种编程思想,类似面向对象,函数式编程。

本质上是对数据流或某种变化做出的反应,这个变化什么时候触发是未知的,所以他是一种基于异步、回调的方式在处理问题。

当越来越多的程序员,开始使用这种编程思想时,需要一些大佬来统一一个思想规范。是国外的几个大佬公司启动的了Reactive Steams项目。Netflix、Pivotal、Lightbend联合来为异步流处理提供标准,规范,他提供了对运行时环境以及网络的工作。

Reactive Steams翻译过来就是响应式/反应式流。其实是一种基于异步流处理的标准化规范,目的是在使用流处理更加可靠,高效和响应式。

二、Java层面的Reactive Steams

基于这个规范的实现很多,比如三方库中比较出名的RxJava,Reactor等等。

但是JDK8版本中,Java已经有了CompletableFuture的支撑,我们可以将大量的异步任务做好编排。但是在JDK8版本中的CompletableFuture依然有很多特性无法支撑。所以在JDK9,CompletableFuture做了很多的更新,比如支持延迟,超时,子类化之类功能。

这时,咱们会发现,其实CompletableFuture已经可以去支撑做一些异步编程的操作了。但是为什么很多大公司依然还是使用RxJava,Reactor这种三方依赖库呢?

问题在于,大多数的时候,咱们的采用异步编程处理的任务并不是非常复杂的。这个时候,咱们确实不需要去使用Reactive Steams反应流的框架。如果系统越来越复杂,或者你处理的业务本身就是及其复杂的那种,你就要去写一个让人头皮发麻的代码了。随着时间的推移,这种代码会变成非常难以维护。

其次CompletableFuture并不是真正的基于Reactive Steams去实现。CompletableFuture描述的是单次执行的结果。尽管可以通过各种方法将异步任务之间构建成一串任务组成的流程图,本质上依然是单词结果。

反应式流,面向的是Stream。 咱们Java中的Stream API更类似Reactive Steams的思想。Stream API是同步阻塞的。

最经典的就是CompletableFuture无法处理Reactive Steams中的一个核心概念,back pressure(背压,反压,回压),比如在上下游承载能力不同时,比如下游玩不转了,需要告知上游采取一些策略去解决。CompletableFuture明显无法处理这种。

其次还有Java中提供的回调,Future机制在实现响应式编程中,问题和缺点都比较难处理。有个比较出名的概念叫做callback hell(回调地狱)。简单来说就是回调里面套回调,虽然将子过程做到解耦,但是随着业务的负责,回调代码的可读性、复杂性就大大的增加,这个就是快乐的回调地狱。

所以,咱们需要一套框架或者说类库来实现真正响应式流,大概需要几个特性:

  • 支持将异步任务做封装以及组装,需要API对异步任务进行包装,并且需要很多子任务来对异步操作进行链式组装,过程中包括过滤,异常处理,超时等等操作。
  • 减少异步任务的嵌套,减少代码的复杂性,增加可读性,避免callback hell这种及其复杂恶心的代码。
  • 支持背压back pressure,也就需要有上游和下游的概念,可以做到协商处理数据流的速度。

三、Java层面Reactive Steams的API

首先Reactive Steams响应流实现方式其实是基于观察者模式的扩展,同时也能看到发布订阅模式,责任链模式等等。

整个Reactive Steams流程大致如下。

image.png

直接在JDK9版本之上查看Doug Lee提供的Flow类。(我的是JDK11)。

在Flow类中,提供了核心的四个接口。

Publisher,Subscriber,Subscription,Processor

image.png

Publisher:Publisher是函数式接口,负责发布数据的。 Publisher内部有一个方法subscribe方法,去和具体的订阅者绑定关系。

image.png

Subscriber:Subscriber是订阅者,负责订阅,消费数据。

四个方法:

  • onSubscribe:定于成功后触发,并且表明可以开始接收发布者的数据元素了。
  • onNext:每次获取到发布者的数据元素都会执行onNext。
  • onError:接收数据元素时,出现异常等问题时,走onError。
  • onComplete:当指定接收的元素个数搞定后,触发onComplete。

image.png

Subscription:发布者和订阅者是基于Subscription关联的。当建立了订阅的关系后,发布者会将Subscription传递给订阅者。订阅者指定获取元素的数量和取消订阅操作,都要基于Subscription去操作。

提供了两个方法:

  • request:订阅者要获取的元素个数。
  • cancel:取消订阅,当前的订阅者不接收当前发布者的元素。

image.png

Processor:Processor继承了Publisher和Subscriber,即是发布者也是订阅者。Processor一般作为数据的中转,订阅者处理完数据元素,可以再次发给下一个订阅者。

image.png

这四个接口很重要,是Reactive Streams的规范,但是可以明显的看到,内部没有具体的内容实现。

这里就类似JDBC这种规范,规范在JDK9中提出来了,想实现,可以基于当前的这个四个接口再做具体的逻辑处理以及实现的细节。

四、Java层面Reactive Steams基本操作

咱们测试Java中的Flow里提供的API时,就是走最基本的操作。

其中Processor不需要重写,玩最基本的操作,不去做订阅者和发布者的转换。

其次Subscription也不需要重写,这东西就是提供了订阅者指定订阅的消息个数,以及取消的操作。

然后Publisher需要重写,但是JDK中已经提供了一个Publisher的实现,SubmissionPublisher,可以直接使用。

最后,Subscriber需要咱们自己重写,指定好订阅消息的个数,已经消费的一些逻辑

public class MySubscriber implements Flow.Subscriber<Integer> {

    private Flow.Subscription subscription;

    @Override
    // 绑定好订阅关系后,就会触发这个方法
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(10);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println(Thread.currentThread().getName() + ":接收到数据流:" + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(Thread.currentThread().getName() + ":接收消息出现异常:" + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + ":当前订阅者要求接收的消息全部处理完毕。");
    }
}

直接使用SubmissionPublisher测试整体效果

public static void main(String[] args) {
    // 只有一个工作线程的线程池
    ExecutorService executor = Executors.newFixedThreadPool(1);
    // 指定缓冲区的大小
    int maxBufferCapacity = 5;

    // 需要指定两个参数
    // 第一个参数需要传递一个线程池,指定订阅者使用的线程
    // 第二个参数,需要指定一个缓冲区,发布者发布消息后,消息会扔到缓冲区里。
    SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(executor,maxBufferCapacity);

    // 绑定订阅者
    MySubscriber subscriber = new MySubscriber();
    publisher.subscribe(subscriber);

    // 发布消息
    for (int i = 0; i < 10; i++) {
        System.out.println(Thread.currentThread().getName() + ":发布消息:" + i);
        publisher.submit(i);
    }

    // 释放资源
    publisher.close();
    executor.shutdown();
}

最终效果:image.png

缓冲区: 缓冲区就是发布者和订阅者之间的一块内存,类似线程池中的阻塞队列,可以将消息扔到这个缓存区里。其次咱们设置的缓冲区大小是5,但是发现get出来的时候,5被替换为了8。这边也是SubmissionPublisher为了更有效的使用内存,默认会基于roundCapacity方法将传递的缓冲区大小替换为2的n次幂。

缓冲区大小为4,触发可以发送5条,1条被订阅者拿走执行onNext,剩下的4个会存储在缓冲区内部

image.png

背压效果: 当订阅者指定的消息已经全部处理完毕后,发布者最多只能发布缓冲区大小个数的消息,剩下的内容会基于背压的效果直接暂时不发送。

订阅者触发onComplete: 需要你发布者做了close操作,确认了发布者已经将消息全部发送,并且订阅者也已经将全部的消息处理完毕后,才会触发onComplete

订阅者的subscription的使用: 订阅者可以在onNext或者其他方法中动态的使用subscription去指定后续需要几个消息订阅,已经是否需要取消订阅消息等操作。

五、Reactive Steams落地体验

5.1、回调地狱问题

前面的方式大致了解啦JDK9中更新的Reactive Streams的规范,咱们实现也仅仅是看到了发布订阅和回压的效果。并没有看到如何解决回调地狱的问题。咱们可以通过Spring5官网提供的一个例子,来体验一下CallBack Hell回调地狱带来的问题。后面再根据三方的实现来看一下基于Reactive Streams实现后效果如何。这里基本是根据伪代码走的。

例子:在用户的UI页面上,展示当前用户最喜欢的Top5的商品详情。这里会根据用户的ID去查询当前用户Top5商品的ID,如果ID可以查询到之后再根据商品的ID去查询商品的详情。如果当前用户ID查询的结果不存在喜欢的Top商品,没有的话,通过推荐服务查询Top5的商品信息。展示给用户。

当前例子需要三个服务的支撑:

  • 根据用户ID查询用户的Top5商品ID
  • 根据Top5商品ID查询商品详情
  • 调用推荐服务,获取5个商品详情

基于Java最原生的异步编程方式,实现上述操作,查看到底什么是回调地狱。。。

准备了商品详情的实体类。

@Data
public class Fav {

    private String itemId;

    private String itemName;

    private String itemDetail;

}

准备了回调方法,拿到结果后触发

public interface Callback<T> {

    void onSuccess(T t);

    void onError(Throwable throwable);

}

准备了访问三个服务的Service接口

public interface UserService {

    /**
     * 根据用户Id查询用户的Top5商品Id
     * @param userId
     * @param list
     */
    void getFav(String userId, Callback<List<String>> list);

}

public interface ItemService {


    /**
     * 根据商品Id查询商品的详情
     * @param itemId
     * @param callback
     */
    void getDetail(String itemId,Callback<Fav> callback);
}

public interface SuggestionService {

    /**
     * 调用推荐服务,获取推荐商品
     * @param favs
     */
    void getSuggestion(Callback<List<Fav>> favs);
}

准备了响应数据的UI线程工具以及响应方法

public class UiUtils {

    public static void submitOnUiThread(Runnable runnable){
        // 线程池中的线程做响应的操作………………
    }


    public static void show(Object obj){
        // 利用UI线程展示具体数据
    }

    public static void error(Object obj){
        // 出现错误响应的内容
    }


}

完成了Controller中的异步编程效果

@RestController
public class CallBackHellController {

    @Autowired
    private UserService userService;

    @Autowired
    private ItemService itemService;

    @Autowired
    private SuggestionService suggestionService;


    @GetMapping("/callbackhell")
    public void callbackHell(String userId){
        //1、调用用户服务,查询Top5商品Id
        userService.getFav(userId, new Callback<List<String>>() {
            @Override
            public void onSuccess(List<String> list) {
                // 已经查询到商品Id,但是不知道是否有值
                if (list.isEmpty()){
                    // 3、用户没有Top5商品Id,通过推荐服务查询推荐商品详情
                    suggestionService.getSuggestion(new Callback<List<Fav>>(){
                        @Override
                        public void onSuccess(List<Fav> favs) {
                            // 推荐服务查询到了商品详情,响应即可
                            UiUtils.submitOnUiThread(() -> {
                                favs.stream().limit(5).forEach(UiUtils::show);
                            });
                        }
                        @Override
                        public void onError(Throwable throwable) {
                            UiUtils.error(throwable);
                        }
                    });

                }
                else{
                    // 2、通过用户查询到了Top5商品Id,通过商品Id查询商品详情
                    list.stream().limit(5).forEach(itemId -> itemService.getDetail(itemId,new Callback<Fav>(){

                        @Override
                        public void onSuccess(Fav fav) {
                            // 查询到了商品详情,利用UI线程,给客户端响应数据
                            UiUtils.submitOnUiThread(() -> UiUtils.show(fav));
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            // 出现异常了。
                            UiUtils.error(throwable);
                        }
                    }));
                }
            }
            @Override
            public void onError(Throwable throwable) {
                // 出现异常了。
                UiUtils.error(throwable);
            }
        });


    }

}

5.2、解决回调地狱问题

这里为了解决回调地狱问题,需要一个Reactor的依赖来帮助咱们实现异步编程。

需要导入依赖

<!--        reactor的核心依赖-->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.17.RELEASE</version>
</dependency>

不能再使用之前的Callback方式了。需要使用reactor提供的Flux,并且这种链式操作会更直观,也更好维护。就需要修改三个服务对应的Service。

public interface UserService {

    /**
     * 根据用户ID查询Top5商品ID
     * @param userId
     * @return
     */
    Flux<List<String>> getFav(String userId);

}

public interface ItemService {


    /**
     * 根据商品ID查询商品详情
     * @param itemId
     * @return
     */
    Flux<Fav> getDetail(String itemId);

}

public interface SuggestionService {

    /**
     * 获取推荐的商品详情
     * @return
     */
    Flux<List<Fav>> getSuggestion();

}

就可以利用Flux提供的API来解决之前回调地狱的问题。

@RestController
public class ReactorCallbackController {

    @Autowired
    private UserService userService;

    @Autowired
    private ItemService itemService;

    @Autowired
    private SuggestionService suggestionService;


    @GetMapping("reactorcallback")
    public void reactorCallback(String userId){
        userService
                .getFav(userId)   // 根据用户Id查询Top5商品Id
                .flatMap(itemService::getDetail)  // 根据商品ID查询商品详情
                .switchIfEmpty(suggestionService.getSuggestion())  // 如果前面为null,这里通过推荐服务查询商品详情
                .take(5)    // 获取前5个数据
                .publishOn(UiUtils.reactorOnUiThread())    // 使用Ui线程
                .subscribe(UiUtils::show,UiUtils::error);   // 成功走show,失败走error
    }

}

5.3、CompletableFuture的异步编程

Future的形式相比Callbacl效果要好一些,虽然JDK8和9都对CompletableFuture做了各种优化,但是他的表现还是不太好。多个Future在嵌套时,可读性还是比较差的。并且CompletableFuture不存在什么回压,或者是延迟调用的功能。

现在借助CompletableFuture来实现一个场景。

1、获取一个用户ID的列表。

2、通过用户ID分别获取他的名字以及统计信息。(希望这两个操作是并行执行的)

3、当两个信息都获取到之后,封装成一个普通字符串即可。

4、响应数据,最后拿到结果(输出一下)

实现的代码

public class GetNameAndStatTestByCF {

    public static void main(String[] args) {
        // 1、获取一组用户ID列表
        CompletableFuture<List<String>> idList = getID();

        CompletableFuture<List<String>> dataCompletableFuture = idList.thenComposeAsync(ids -> {

            Stream<CompletableFuture<String>> resultStream =  ids.stream().map(id -> {
                // 2、并行基于ID查询名称信息
                CompletableFuture<String> nameTask = getName(id);
                // 2、并行基于ID查询统计信息
                CompletableFuture<Integer> statTask = getStat(id);
                // 让两个查询名称信息和查询统计信息操作并行执行
                return nameTask.thenCombineAsync(statTask,(name,stat) -> {
                    // 3、拿到信息组装
                    return "Name:" + name + ",Stat:" + stat;
                });
            });
            // 将resultStream转换成一个数组
            List<CompletableFuture<String>> resultList = resultStream.collect(Collectors.toList());
            // 全部的任务封装起来
            CompletableFuture<Void> allDone = CompletableFuture.allOf(resultList.toArray(new CompletableFuture[]{}));
            // 执行全部任务
            return allDone.thenApplyAsync(v -> resultList.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
        });

        // 4、获取全部的组件信息后响应客户端(输出)
        List<String> data = dataCompletableFuture.join();
        System.out.println(data);
    }

    // 模拟zz服务获取统计信息
    private static CompletableFuture<Integer> getStat(String id) {
        return CompletableFuture.supplyAsync(() -> {
            return 666;
        });
    }
    // 模拟yy服务获取名称信息
    private static CompletableFuture<String> getName(String id) {
        return CompletableFuture.supplyAsync(() -> {
            return "张三";
        });
    }
    // 模拟xx服务,获取一组用户ID
    private static CompletableFuture<List<String>> getID() {
        return CompletableFuture.supplyAsync(() -> {
            // 模拟查询三方服务
            List<String> list = new ArrayList<>();
            list.add("1");
            list.add("2");
            list.add("3");
            return list;
        });
    }

}

5.4、解决CompletableFuture的问题

CompletableFuture可以实现一些简单的异步编程,但是可看性和维护性以后后期的扩展都需要对整体代码做比较大成本的维护。依然采用Reactor来实现一个一模一样的逻辑,再看代码效果。

public class GetNameAndStatByReactor {

    public static void main(String[] args) {
        // 1、获取一组用户ID列表
        Flux<String> idFlux = getId();

        Flux<String> result = idFlux.flatMap(id -> {
            // 2、并行基于ID查询名称信息
            Flux<String> nameFlux = getName(id);
            // 2、并行基于ID查询统计信息
            Flux<Integer> statFlux = getStat(id);
            // 俩任务并行处理完毕,触发3
            return nameFlux.zipWith(statFlux, (name, stat) -> {
                // 3、拿到信息组装
                return "Name:" + name + ",Stat:" + stat;
            });
        });
        Mono<List<String>> listMono = result.collectList();
        List<String> info = listMono.block();
        // 4、获取全部的组件信息后响应客户端(输出)
        System.out.println(info);
    }

    private static Flux<Integer> getStat(String id) {
        // 会查询三方服务,然后封装结果
        return Flux.just(888);
    }

    private static Flux<String> getName(String id) {
        // 会查询三方服务,然后封装结果
        return Flux.just("张三");
    }


    private static Flux<String> getId() {
        // 会查询三方服务,然后封装结果
        return Flux.just("1","2","3");
    }
}

六、RxJava2实现异步编程

RxJava是一个小框架,或者是依赖库。在RxJava的1.x版本中,他并不基于ReactiveStreams去实现的。没有关系,因为RxJava的2版本,就是基于ReactiveStreams实现的了。

使用RxJava巨简单,因为作者想将RxJava尽量做到轻量级,就一个依赖。

<!--        RxJava2的依赖-->
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

6.1 RxJava2的入门操作

获取一个Person对象的集合,将Person集合中的所有年龄大于10岁的Person对象删选出来,并输出他的名字。

采用RxJava来实现一波。

public class Demo1 {

    public static void main(String[] args) {
        //1、获取Person对象集合
        List<Person> personList = getPersonList();

        //2、完成上面要求的操作
        //2.1、将person集合转换为RxJava的流
        Flowable.fromArray(personList.toArray(new Person[]{}))
                //2.2 过滤年龄大于10岁的
                .filter(person -> person.getAge() > 10)
                //2.3 获取筛选后的Person名称
                .map(person -> person.getName())
                //2.4 输出Name
                .subscribe(System.out::println);
    }

    private static List<Person> getPersonList() {
        List<Person> personList = new ArrayList<>();
        personList.add(new Person("大娃",5));
        personList.add(new Person("二娃",7));
        personList.add(new Person("三娃",9));
        personList.add(new Person("四娃",11));
        personList.add(new Person("五娃",13));
        return personList;
    }
}

6.2 RxJava2的基础处理流程

在RxJava中有三个核心的角色

  • 被观察者(Observable)
  • 观察者(Observer)
  • 订阅(Subscribe)
/**
 * 采用RxJava2 ,来个最基本的操作
 * @author zjw
 * @description
 */
public class Demo2 {

    public static void main(String[] args) {
        //1、构建Observable
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("World");
                emitter.onComplete();
            }
        });

        //2、构建Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("开始订阅");
            }

            @Override
            public void onNext(String s) {
                System.out.println("观察者:" + s);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("订阅结束");
            }
        };

        //3、订阅
        observable.subscribe(observer);
    }

}

6.3 创建操作符

创建操作符的API还是比较多的,大致都介绍一下。

6.3.1 create

创建被观察者对象,传入ObservableOnSubscribe实现,指定数据流

具体实现在6.2中有。

6.3.2 just

可以在创建被观察者的同时,发送数据流内容

public static void main(String[] args) {
    //1、构建被观察者,同时订阅观察者
    Observable.just("大娃", "二娃", "三娃", "四娃")
              .subscribe(new Observer<String>(){
                  @Override
                  public void onSubscribe(Disposable d) {
                      System.out.println("开始订阅");
                  }

                  @Override
                  public void onNext(String s) {
                      System.out.println("观察者:" + s);
                  }

                  @Override
                  public void onError(Throwable e) {
                      e.printStackTrace();
                  }

                  @Override
                  public void onComplete() {
                      System.out.println("订阅结束");
                  }
              });

}

另外,也将Observer的构建简化一下,可以直接使用consumer只编写onNext的逻辑

/**
 * 前面的Demo3中指定observer太麻烦。
 * @author zjw
 * @description
 */
public class Demo4 {

    public static void main(String[] args) {
        //1、构建被观察者,同时订阅观察者
        Observable.just("大娃", "二娃", "三娃", "四娃")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("观察者:" + s);
                    }
                });
    }
}
6.3.3 fromArray

前面聊的just,最多可以传递10个事件/流。 而fromArray入参直接是一个可变参。

public class Demo5 {
    public static void main(String[] args) {
        Observable.fromArray("大娃","二娃","三娃")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("观察者:" + s);
                    }
                });
    }
}
6.3.4 fromCallable

可以基于fromCallable传递一个函数,基于函数计算出来的返回结果,就是咱们被观察者要传递的事件/流

public class Demo6 {

    public static void main(String[] args) {
        Observable.fromCallable(() -> {
            return "Done!";
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("观察者:" + s);
            }
        });
    }
}
6.3.5 timer

timer方法的做到指定之间之后,发送一个 0L 的值,给观察者。这个0L不能动,他的目的就是触发观察者去执行一下他的函数,做操作。
F

public class Demo7 {

    public static void main(String[] args) throws IOException {
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        // 在调用timer时,默认会传递一个Schedulers.computation()的线程池,这个线程池中的线程,是守护线程
                        // 所以在最开始,没有查看到观察者的输出。
                        System.out.println(Thread.currentThread().getName());
                        System.out.println("观察者:" + aLong);
                    }
                });
        // 让main函数的用户线程不要退出!
        System.in.read();
    }
}
6.3.6 interval

每隔一段时间,就会发送一个事件/流,发送的时间是从0开始递增,1,2,3,4。

public class Demo8 {

    public static void main(String[] args) throws IOException {
        Observable.interval(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(Thread.currentThread().getName() + ":" + aLong);
                    }
                });

        System.in.read();
    }
}
6.3.7 intervalRange

和interval类似,但是Range的意思是指定发送的事件/流,从什么数值开始,以及具体的数量

public class Demo9 {

    public static void main(String[] args) throws IOException {
        Observable.intervalRange(100,4,0,2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(Thread.currentThread().getName() + ":" + aLong);
                    }
                });

        System.in.read();
    }
}
6.3.8 range&rangeLong

同一时间,发送事件的序列,指定开始和结束,包左不包右 [)

rangeLong就是传递的序列是Long类型的,而普通的range传递的是Integer

public class Demo10 {

    public static void main(String[] args) throws IOException {
        Observable.range(0,10)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(Thread.currentThread().getName() + ":" + integer);
                    }
                });

        System.in.read();
    }
}
6.3.9 never、error、empty
  • never:不发送任何事件
  • error:触发onError事件
  • empty:触发onComplete事件
public class Demo11 {

    public static void main(String[] args) {
//        Observable.never()
//        Observable.error(new RuntimeException("error事件"))
        Observable.empty()
                .subscribe(new Observer() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("开始订阅");
                    }

                    @Override
                    public void onNext(Object s) {
                        System.out.println("观察者:" + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("出现异常:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("订阅结束");
                    }
                });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1527678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker init 生成Dockerfile和docker-compose.yml —— 筑梦之路

官网&#xff1a;https://docs.docker.com/engine/reference/commandline/init/ 简介 docker init是一个命令行实用程序&#xff0c;可帮助初始化项目中的 Docker 资源。.dockerignore它根据项目的要求创建 Dockerfile、Compose 文件。这简化了为项目配置 Docker 的过程&#…

Linux 文件系统:文件描述符、管理文件

目录 一、三个标注输入输出流 二、文件描述符fd 1、通过C语言管理文件—理解文件描述符fd 2、文件描述符实现原理 3、文件描述符0、1、2 4、总结 三、如何管理文件 1、打开文件的过程 2、内核空间的结构 struct task_struct&#xff08;PCB&#xff09; struct file…

html5播放flv视频

参考&#xff1a;flv-h265 - npmHTML5 FLV Player. Latest version: 1.7.0, last published: 6 months ago. Start using flv-h265 in your project by running npm i flv-h265. There are no other projects in the npm registry using flv-h265.https://www.npmjs.com/packag…

java类的定义及使用

1、类的定义 &#xff08;1&#xff09;类的重要性&#xff1a;是Java程序的基本组成单位&#xff1b; &#xff08;2&#xff09;类是什么&#xff1a;是对现实生活中一类具有共同属性和行为的事物的抽象&#xff0c;确定对象将会拥有的属性和行为&#xff1b; &#xff08…

卷积篇 | YOLOv8改进之C2f模块融合SCConv | 即插即用的空间和通道维度重构卷积

前言:Hello大家好,我是小哥谈。SCConv是一种用于减少特征冗余的卷积神经网络模块。相对于其他流行的SOTA方法,SCConv可以以更低的计算成本获得更高的准确率。它通过在空间和通道维度上进行重构,从而减少了特征图中的冗余信息。这种模块的设计可以提高卷积神经网络的性能。本…

AI时代,Matter如何融入与服务中国智能家居市场,助力中国企业出海?

随着智能家居产业的飞速发展&#xff0c;丰富多样的智能家居产品为消费者带来了便利的同时&#xff0c;因为不同品牌、不同产品之间的协议与标准不统一&#xff0c;导致消费者体验产生割裂&#xff0c;本来想买个“智能”家居&#xff0c;结果买了个“智障”家居&#xff0c;这…

C++开发基础——类对象与构造析构

一、基础概念 类&#xff1a;用户自定义的数据类型。 对象&#xff1a;类类型的变量&#xff0c;类的实例。 类的成员&#xff1a;成员变量和成员函数。 成员变量&#xff1a;类中定义的变量。 成员函数&#xff1a;类中定义的函数。 定义类的代码样例&#xff1a; class…

社区生活超市管理系统|基于JSP技术+ Mysql+Java+ Tomcat的社区生活超市管理系统设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 2024年56套包含java&#xff0c;ssm&#xff0c;springboot的平台设计与实现项目系统开发资源&#xff08;可…

【Unity动画】Unity如何导入序列帧动画(GIF)

Unity 不支持GIF动画的直接播放&#xff0c;我们需要使用序列帧的方式 01准备好序列帧 02全部拖到Unity 仓库文件夹中 03全选修改成精灵模式Sprite 2D ,根据需要修改尺寸&#xff0c;点击Apply 04 创建一个空物体 拖动序列上去 然后全选所有序列帧&#xff0c;拖到这个空物体…

python异常:pythonIOError异常python打开文件异常

1.python读取不存在的文件时&#xff0c;抛出异常 通过 open()方法以读“r”的方式打开一个 abc.txt 的文件&#xff08;该文件不存在&#xff09;&#xff0c;执行 open()打开一个不存在的文件时会抛 IOError 异常&#xff0c;通过 Python 所提供的 try...except...语句来接收…

Java基础经典10道题

目录 for循环的嵌套 题目一: 求101到200之间的素数的个数,并打印 代码分析: 注意点: 题目二:开发验证码 代码分析: 题目三:数组元素的复制 代码分析: 题目四:评委打分 健壮版代码: 代码分析:看源码 注意点: 题目五:数字加密 优化版代码: 代码分析: 题目六:数字…

HandyControl PropertyGrid及自定义编辑器

前提条件 项目引入对应HandyControl对应版本包。 使用案例 UI部分 <Window xmlns:hc"https://handyorg.github.io/handycontrol"><hc:TabControl><hc:TabItem Header"默认样式"><hc:PropertyGrid Width"380" SelectedO…

[RCTF2015]EasySQL ---不会编程的崽

今天也是sql注入的新类型---二次注入。不得不说花样真的多哦。 既然真的是sql注入了。那就不测试其他地方了。现在注册进去看一下界面 单纯的回显了名字。源代码里发现user.php。 可以修改密码&#xff1f;二次注入应该就在用户名这里了。因为修改密码时&#xff0c;用户名会被…

学习笔记Day11:初探Linux

Linux系统初探 Linux系统简介 发行版本Ubuntu/centOS&#xff0c;逻辑一样&#xff0c;都可以用。 服务器 本质是一台远程电脑&#xff0c;大多数服务器是Linux系统&#xff0c;通常使用命令行远程访问而不是桌面操作。LInux服务器允许多用户同时访问。NGS组学测序数据上游…

硬盘哨兵Hard Disk Sentinel Pro V6.20.0.0 便携版

Hard Disk Sentinel 是一款功能强大的硬盘监控和分析软件&#xff0c;专为 Windows 用户设计。它可以实时监测硬盘驱动器&#xff08;HDD&#xff09;、固态硬盘&#xff08;SSD&#xff09;、混合硬盘&#xff08;SSHD&#xff09;、NVMe SSD、RAID 数组和外部 RAID 盒子的健康…

pdf文件属性的删除

pdf文件属性的删除 投标过程中需要处理文件属性&#xff0c;特别是word文件属性以及pdf文件的处理 这里讲解pdf文件属性的处理 word处理在我的另外一个博客中&#xff0c;word文件属性的处理 https://ht666666.blog.csdn.net/article/details/134102504 一般用 adobe acroba…

MySQL—数据库导入篇

什么是数据库&#xff1f; 数据库是干啥的&#xff1f; 数据库&#xff08;Database&#xff09;是按照数据结构来组织、存储和管理数据的仓库。 MySQL属于哪一类数据库&#xff1f; MySQL是一种关系型数据库。所谓的关系型数据库&#xff0c;是建立在关系模型基础上的数据库&a…

LabVIEW飞行器螺旋桨性能测试与数据监控

LabVIEW飞行器螺旋桨性能测试与数据监控 开发LabVIEW的电动飞行器螺旋桨性能测试与数据监控系统&#xff0c;专门针对电动飞行器螺旋桨在运行过程中的性能测试和监控需求。通过采集转速、转矩、拉力和温度等关键参数&#xff0c;系统能够实时监测和分析螺旋桨的状态&#xff0…

深度学习_ResNet_5

ResNet学习目标 什么是ResNet为什么要引入ResNet&#xff1f;ResNet网络结构的特点利用ResNet完成图像分类 什么是ResNet&#xff1f; ResNet&#xff08;Residual Network&#xff09;是一种深度残差网络&#xff0c;由何凯明等人在2015年提出&#xff0c;是深度学习领域中一…

网络视频播放器|基于JSP技术+ Mysql+Java+ B/S结构的网络视频播放器设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 2024年56套包含java&#xff0c;ssm&#xff0c;springboot的平台设计与实现项目系统开发资源&#xff08;可…