说明
- 合并多个源Observable的事件,事件不是按照顺序被发射(如需顺序使用concat操作符)。
- Error事件被延迟发射,针对的是源Observable中的Error事件,多个源Observable都有Error时,会合并Error事件。
- 执行结束
- 正常执行所有事件,onComplete()代表执行结束。
- 有Error事件时,onError()代表执行结束。
Error事件位置以及类型
Error事件需要在源Observable中才会被延迟发射。
一个源Observable 有Error事件,onError()中接收到的是实际的错误类型,不是CompositeException类型。
多个源Observable 有Error事件时,onError()中接收到的CompositeException类型,此类型是个数组。
案例
错误不在源Observable中,不会被延迟发射
Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.mergeDelayError(odds, evens)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//此处会产生错误,错误在mergeDelayError后面,没有在源Observable中(odds,evens)中,
//Error不会被延迟
if (integer == 3) return null;
return integer;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
错误在源Observable中,会被延迟发射
Observable<Integer> odds = Observable.just(1, 3, 5)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//此处会产生错误,错误在源Observable中,会被延迟
if (integer == 3) return null;
return integer;
}
});
Observable<Integer> evens = Observable.just(2, 4, 6)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer;
}
});
Observable.mergeDelayError(odds, evens)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
多个源Observable中有Error,onError()中的类型是CompositeException
Observable<Integer> odds = Observable.just(1, 3, 5)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//此处会产生错误,错误在源Observable中,会被延迟
if (integer == 3) return null;
return integer;
}
});
Observable<Integer> evens = Observable.just(2, 4, 6)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//此处会产生错误,错误在源Observable中,会被延迟
if (integer == 4) return null;
return integer;
}
});
Observable.mergeDelayError(odds, evens)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
if (e instanceof CompositeException) { //源Observable 异常
CompositeException compositeException = (CompositeException) e;
List<Throwable> execptionList = compositeException.getExceptions();
Log.e(TAG, "源Observable 异常数量" + execptionList.size());
}
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
线程切换位置
线程切换需要在源Observable中完成。
线程切换不在源Observable中执行,源Observable中没有Error事件时正常,如果放在操作符后,会影响Error事件的延迟。
案例
源Observable中没有Error事件,线程切换在mergeDelayError操作符后。(正常)
Observable<Integer> odds = Observable.just(1, 3, 5)
.subscribeOn(Schedulers.io()) //子线程执行 源Observable
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.e(TAG, "改变:" + Thread.currentThread());
return integer+1;
}
});
Observable<Integer> evens = Observable.just(2, 4, 6)
.subscribeOn(Schedulers.io()) //子线程执行 源Observable
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.e(TAG, "改变:" + Thread.currentThread());
return integer+1;
}
});
Observable.mergeDelayError(odds, evens)
.observeOn(AndroidSchedulers.mainThread()) // 非缘Observable中切换主线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
源Observable中有Error事件,线程切换在mergeDelayError操作符后。(异常)
只能接受到一个异常数据,接收不到其他事件的数据
Observable<Integer> odds = Observable.just(1, 3, 5)
.subscribeOn(Schedulers.io()) //子线程执行 源Observable
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.e(TAG, "改变:" + Thread.currentThread());
if (integer == 3) return null; //此处发生错误
return integer+1;
}
});
Observable<Integer> evens = Observable.just(2, 4, 6)
.subscribeOn(Schedulers.io()) //子线程执行 源Observable
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.e(TAG, "改变:" + Thread.currentThread());
if (integer == 2) return null; //此处发生错误
return integer+1;
}
});
Observable.mergeDelayError(odds, evens)
.observeOn(AndroidSchedulers.mainThread()) //切换主线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
线程切换在源Observable执行,源Observable中无Error事件(正常)
Observable<Integer> odds = Observable.just(1, 3, 5)
.subscribeOn(Schedulers.io()) //源Observable 切换线程
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "改变:" + Thread.currentThread());
return integer+1;
}
})
.observeOn(AndroidSchedulers.mainThread());//源Observable 切换线程
Observable<Integer> evens = Observable.just(2, 4, 6)
.subscribeOn(Schedulers.io())//源Observable 切换线程
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "改变:" + Thread.currentThread());
return integer+1;
}
})
.observeOn(AndroidSchedulers.mainThread()); //源Observable 切换线程
Observable.mergeDelayError(odds, evens)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
线程切换在源Observable执行,源Observable中有Error事件(正常)
Observable<Integer> odds = Observable.just(1, 3, 5)
.subscribeOn(Schedulers.io()) //源Observable 切换线程
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "改变:" + Thread.currentThread());
if (integer == 3) return null;//此处发生错误
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread()); //源Observable 切换线程
Observable<Integer> evens = Observable.just(2, 4, 6)
.subscribeOn(Schedulers.io())//源Observable 切换线程
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
// Log.e(TAG, "改变:" + Thread.currentThread());
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread());//源Observable 切换线程
Observable.mergeDelayError(odds, evens)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常" + " Thread" + Thread.currentThread());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});
异常处理
源Observable的异常,单独处理,使用源Observable的doOnError(),合并处理使用订阅者的doOnError()。
Observable<Integer> odds = Observable.just(1, 3, 5)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
if (integer == 3) return null;//此处有错误
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnError(new Consumer<Throwable>() { //异常被捕获,该Observable其他事件不会被接收
@Override
public void accept(Throwable throwable) throws Exception {
isError1 = true;
Log.e(TAG, "接收到异常1");
}
});
Observable<Integer> evens = Observable.just(2, 4, 6)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnError(new Consumer<Throwable>() { //此处不会执行,没有异常,所有事件会被正常接收
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "接收到异常2");
}
});
Observable.mergeDelayError(odds, evens)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "接收到数据:" + integer + " Thread" + Thread.currentThread());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable e) throws Exception {
//只要源Observable有异常,不管有没有在源Observable中被捕获,此处都有会执行
Log.e(TAG, "接收到所有异常" + " Thread" + Thread.currentThread());
Log.e(TAG, "isError1 = " + isError1);
if (e instanceof CompositeException) { //源Observable 异常
CompositeException compositeException = (CompositeException) e;
List<Throwable> execptionList = compositeException.getExceptions();
Log.e(TAG, "源Observable 异常数量" + execptionList.size());
}
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "接收完成" + " Thread" + Thread.currentThread());
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe" + " Thread" + Thread.currentThread());
}
});