Android RxJava3 原理浅析

news2025/2/22 18:06:22


  val retrofit =Retrofit.Builder()
        val api = retrofit.create(
            .subscribe(object : SingleObserver<MutableList<Repo>>{
                override fun onSubscribe(d: Disposable?) {
                    clv1.text = "onSubscribe"

                override fun onSuccess(t: MutableList<Repo>) {
                    clv1.text =t[0].name

                override fun onError(e: Throwable) {
                    clv1.text = e.message!!






.subscribeOn( 那么这个就不需要了


private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;


Single.just 发送顺时事件

            .subscribe(object : SingleObserver<String?>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")

                override fun onSuccess(t: String?) {
                    TODO("Not yet implemented")

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")



 public static <@NonNull T> Single<T> just(T item) {
        Objects.requireNonNull(item, "item is null"); //先判断空
        return RxJavaPlugins.onAssembly(new SingleJust<>(item)); 

 public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        return source;

 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.single;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;

    protected void subscribeActual(SingleObserver<? super T> observer) {


    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed()); //产生订阅 然后丢弃  是个枚举类
        observer.onSuccess(value); //成功  ,没有失败的回调因为不可能失败 因为是一个可用对象

操作符: map,转换对象类型

 val single: Single<Int> = Single.just(1)
        val singleString = : Function<Int,String>{
            override fun apply(t: Int): String {
                return t.toString()


多个single map

按时间发送数据: 从0开始发送,间隔1秒

            .subscribe(object : Observer<Long>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")

                override fun onNext(t: Long?) {
                   clv1.text= t!!.toString()

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")

                override fun onComplete() {
                    TODO("Not yet implemented")

 public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit) {
        return interval(initialDelay, period, unit, Schedulers.computation());


 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;

    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is); //传入

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);

    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;

        public void dispose() {

        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;

        public void run() {
            if (get() != DisposableHelper.DISPOSED) {

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);

 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;

    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);

//实现了Disposable 然后extends AtomicReference引用,线程安全的Disposable
    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;

        public void dispose() {
            DisposableHelper.dispose(this); //调用内部的dis

        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;

        public void run() {
            if (get() != DisposableHelper.DISPOSED) {

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);

 // dispose
 public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get(); //拿到内部的
        Disposable d = DISPOSED;
        if (current != d) { //如果已经被取消
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                return true;
        return false;

//setResource  把内部值设为传入的值 ,只设置一次
    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        Objects.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            if (field.get() != DISPOSED) {
            return false;
        return true;

实际设置定时任务的代码: ObservableInterval->subscribeActual

Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
public void run() {
    if (get() != DisposableHelper.DISPOSED) {
ObservableInterval 内部维护了一个IntervalObserver创建和取消


public final Single<T> delay(long time, @NonNull TimeUnit unit) {
    return delay(time, unit, Schedulers.computation(), false);
 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;

    protected void subscribeActual(final SingleObserver<? super T> observer) {

        final SequentialDisposable sd = new SequentialDisposable(); //内部维护
        observer.onSubscribe(sd); //下游的sd
        source.subscribe(new Delay(sd, observer)); //传递到上游

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> downstream;

        Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
   = sd;
            this.downstream = observer;

        public void onSubscribe(Disposable d) {

        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); 

        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;

            public void run() {

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;

            public void run() {

 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.disposables.Disposable;

 * A Disposable container that allows updating/replacing a Disposable
 * atomically and with respect of disposing the container itself.
 * <p>
 * The class extends AtomicReference directly so watch out for the API leak!
 * @since 2.0
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {

    private static final long serialVersionUID = -754898800686245608L;

     * Constructs an empty SequentialDisposable.
    public SequentialDisposable() {
        // nothing to do

     * Construct a SequentialDisposable with the initial Disposable provided.
     * @param initial the initial disposable, null allowed
    public SequentialDisposable(Disposable initial) {

     * Atomically: set the next disposable on this container and dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #replace(Disposable)
    public boolean update(Disposable next) {
        return DisposableHelper.set(this, next);

     * Atomically: set the next disposable on this container but don't dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #update(Disposable)
    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);

    public void dispose() {

    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());

 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.observable;

import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;

import java.util.Objects;

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        this.function = function;

    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            this.mapper = mapper;

        public void onNext(T t) {
            if (done) {

            if (sourceMode != NONE) {

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {

        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);

        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;

   public final void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {

            this.upstream = d;
//有上下游 且可中断的
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;

            if (beforeDownstream()) {




//dispose 上游dispose
    public void dispose() {


 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;

public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
    final long delay;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;

    public void subscribeActual(Observer<? super T> t) {
        Observer<T> observer;
        if (delayError) {
            observer = (Observer<T>)t; //
        } else {
            observer = new SerializedObserver<>(t);

        Scheduler.Worker w = scheduler.createWorker();//线程调度器

        source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));

    static final class DelayObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        final long delay;
        final TimeUnit unit;
        final Scheduler.Worker w;
        final boolean delayError;

        Disposable upstream;

        DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
            this.downstream = actual;
            this.delay = delay;
            this.unit = unit;
            this.w = w;
            this.delayError = delayError;

        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;

        public void onNext(final T t) {
            w.schedule(new OnNext(t), delay, unit);

        public void onError(final Throwable t) {
            w.schedule(new OnError(t), delayError ? delay : 0, unit);//local延时

        public void onComplete() {
            w.schedule(new OnComplete(), delay, unit);

        public void dispose() {

        public boolean isDisposed() {
            return w.isDisposed();

        final class OnNext implements Runnable {
            private final T t;

            OnNext(T t) {
                this.t = t;

            public void run() {

        final class OnError implements Runnable {
            private final Throwable throwable;

            OnError(Throwable throwable) {
                this.throwable = throwable;

            public void run() {
                try {
                } finally {

        final class OnComplete implements Runnable {
            public void run() {
                try {
                } finally {

关联上下游 先验证 然后赋值
  public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this); //启动下游subsc

主要是看生产者有没有上游,有没有自己生产的 1 有上游,2自己生产的,

dispose 取消也是这个原则


 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;

    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);

        Disposable f = scheduler.scheduleDirect(parent); //runnable,切线程



    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();

        public void onSubscribe(Disposable d) {
//d =上游传递的Disposable 
            DisposableHelper.setOnce(this, d);

        public void onSuccess(T value) {

        public void onError(Throwable e) {

        public void dispose() {
            task.dispose(); //内部取消

        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());

        public void run() {


 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;

    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual; 
            this.scheduler = scheduler;

        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) //赋值下游 
                downstream.onSubscribe(this); //调用下游的Disposable 
//取消的时候 下游也取消

        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);//置换切线程的任务,发消息前取消上游,然后切线程任务 

        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);

        public void run() {
            Throwable ex = error;
            if (ex != null) {
            } else {

        public void dispose() {
            DisposableHelper.dispose(this);//取消的时候 下游也取消

        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());

 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;

    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;

        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {

        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);

        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);

        public void run() {
            Throwable ex = error;
            if (ex != null) {
            } else {

        public void dispose() {

        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());


    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();  //createWorker 1

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子方法

        DisposeTask task = new DisposeTask(decoratedRun, w);//

        w.schedule(task, delay, unit);

        return task;

    public Worker createWorker() {
        return new NewThreadWorker(threadFactory); //threadFactory 2

public interface ScheduledExecutorService extends ExecutorService {


//本质通过线程池创建管理   Executors
 public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
        return exec;

切换主线程 mainThread通过  :Looper.getMainLooper()

static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
//发消息到主线程 handler =  new Handler(looper); loop = mainLoop
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;





随着全球能源的不断增加&#xff0c;能源的有限性与环境问题日益严重&#xff0c;用能管理企业需要一种高效的方法来管理能源与利用能源&#xff0c;因此能源监测管理系统成为了一种不可或缺的工具。 能源监测管理系统的重要性 1、实现节能减排的目标 通过系统&#xff0c;可…


CTQ 关键质量特性 CTQ是在六西格玛管理中常用的重要词汇&#xff0c;所以很多不同界别的人仕都可能听过&#xff0c;CTQ的意思是关键质量特性&#xff0c;Critical To Quality 的缩写。 六西格玛管理提倡的方法是通过客户的声音 (Voice of customer-VOC) &#xff0c;然后把它…

【C语言】嵌套结构体初始化 - 一个有趣的结论

0. 前言 A. 嵌套结构体&#xff08;比如双链表&#xff09;的初始化一般是什么流程&#xff1f; B. 嵌套结构体的内存是如何分布的&#xff1f; C. 结构体中的结构体指针是否需要再次分配内存&#xff1f;不分配会怎么样&#xff1f; 关于嵌套结构体的初始化问题&#xff0c;我…


项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 在使用Element-Plus的form的时候,label只有左右,居中对齐&#xff0c;缺少两端对齐的选项 故研究一下如何实现&#xff0c;其他方法也试过&#xff0c;都没效果&#xff0c;我在别人的基础上又研究了一…


语音识别正在侵入我们的生活。它内置于我们的手机、游戏机和智能手表中。它甚至使我们的房屋自动化。你只需 50 美元&#xff0c;你就可以获得一个 Amazon Echo Dot——一个神奇盒子&#xff0c;你只需大声说出你的需求就可以帮你订购披萨、获取天气预报甚至购买垃圾袋。 但是…




上一篇文章中我们介绍了LiveMedia视频监控汇聚管理平台视频接入方案中功能设计的设备接入方法。在这篇文章中我们来介绍下LiveMedia视频监控汇聚管理平台方案中功能设计里的流媒体转发是如何实现的&#xff1f; 图1流媒体转发框架 平台流媒体转发框架如图1流媒体转发框架所示&a…


如果一个机器学习初学者&#xff0c;仅用三行代码就训练了一个模型&#xff0c;并且模型的性能要比从业数十年的都要好&#xff0c;这是一种什么样的感觉&#xff1f; AutoGluon就能帮你梦想成真。 上面这张图片就是AutoGluon的工作流&#xff0c;多么简单啊&#xff01;根据数…


1.被测试服务器端口输入htop指令进行cpu监控 2.测试机器安装宝塔-》我的工具-》进行网站测试 访问地址&#xff1a;

【FGPA】Verilog:移位寄存器 | 环形计数器 | 4bit移位寄存器的实现 | 4bit环形计数器的实现

目录 Ⅰ. 理论部分 0x00 移位寄存器&#xff08;Shift Register&#xff09; 0x01 环形计数器&#xff08;Ring Counter&#xff09; Ⅱ. 实践部分 0x00 移位寄存器&#xff08;4-bit&#xff09; 0x01 四位环形寄存器&#xff08;4-bit&#xff09; Ⅰ. 理论部分 0x00 …


博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

YOLOv8-Seg改进: 分割小目标系列篇 | SPD-Conv,提升分割小目标和弱小分割图精度

🚀🚀🚀本文改进:SPD-Conv由一个空间到深度(SPD)层和一个无卷积步长(Conv)层组成,可以应用于大多数CNN体系结构,特别是在处理低分辨率图像和分割小目标等更困难的任务时。 🚀🚀🚀SPD-Conv 分割小目标检测首选,暴力涨点 🚀🚀🚀YOLOv8-seg创新专栏:http:…




自我介绍: 面试官,您好! 我叫XXX,今年25岁,目前是XXX大学电子信息学院研三的一名学生,面试岗位是前端研发工程师。下面我将从以下几个方面来介绍自己: (学习能力)在校期间通过了英语四六级、以及一些计算机专业证书,成绩一直是专业前5%,每年都获得学…

Elastic Stack 8.11:引入一种新的强大查询语言 ES|QL

作者&#xff1a;Tyler Perkins, Ninoslav Miskovic, Gilad Gal, Teresa Soler, Shani Sagiv, Jason Burns Elastic Stack 8.11 引入了数据流生命周期、一种配置数据流保留和降采样&#xff08;downsampling&#xff09; 的简单方法&#xff08;技术预览版&#xff09;&#xf…


上一篇文章中我们介绍了LiveMedia视频监控汇聚管理平台技术方案的架构。今天我们来介绍下LiveMedia视频监控汇聚管理平台的视频接入方案。 视频集控平台建设充分考虑利旧的建设原则&#xff0c;同时根据各个现有视频监控建设情况&#xff0c;考虑统一规划、分布实施的建设方式。…




现在&#xff0c;我们将创建一个视图&#xff0c;将员工的姓名、部门和工资信息组合在一起。 CREATE VIEW EmployeeSalaryView AS SELECT e.FirstName, e.LastName, e.Department, s.MonthlySalary FROM Employees e JOIN Salary s ON e.EmployeeID s.EmployeeID;通过这个视图…


修改参数 &#xff0c;开启into outfile的功能 secure_file_priv/home/backups/mysql_outfile 重启数据库是参数生效 按条件导出MySQL数据 select * from receipt_receive_log where gmt_create > 2020-04-13 00:00:00 and gmt_create< 2020-07-13 00:00:00 INTO O…

Azure 机器学习 - 有关为 Azure 机器学习配置 Kubernetes 群集的参考

目录 受支持的 Kubernetes 版本和区域建议的资源计划ARO 或 OCP 群集的先决条件禁用安全增强型 Linux (SELinux)ARO 和 OCP 的特权设置 收集的日志详细信息Azure 机器学习作业与自定义数据存储连接支持的 Azure 机器学习排斥和容许最佳实践 通过 HTTP 或 HTTPS 将其他入口控制器…