Android RxJava3 原理浅析

news2025/2/22 18:06:22

使用

  val retrofit =Retrofit.Builder()
            .baseUrl("https://api.github.com/")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
            .build()
        val api = retrofit.create(API::class.java)
        api.getRepo("rengwuxian")
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : SingleObserver<MutableList<Repo>>{
                override fun onSubscribe(d: Disposable?) {
                    LogUtil.logD("onSubscribe${Thread.currentThread().name}")
                    clv1.text = "onSubscribe"
                }

                override fun onSuccess(t: MutableList<Repo>) {
                    LogUtil.logD(t[0].name)
                    clv1.text =t[0].name
                }

                override fun onError(e: Throwable) {
                    LogUtil.logD(e.message!!)
                    clv1.text = e.message!!
                }
            })

RxJava需要我们主动切线程

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

也可以

.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))

在所有配置中设为后台线程

.subscribeOn(Schedulers.io()) 那么这个就不需要了

源码:

private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
    this.scheduler = scheduler;
    this.isAsync = isAsync;
  }

内部赋值

Single.just 发送顺时事件

Single.just("1")
            .subscribe(object : SingleObserver<String?>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")
                }

                override fun onSuccess(t: String?) {
                    TODO("Not yet implemented")
                }

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")
                }

            })

源码:

 public static <@NonNull T> Single<T> just(T item) {
        Objects.requireNonNull(item, "item is null"); //先判断空
        return RxJavaPlugins.onAssembly(new SingleJust<>(item)); 
    }


//钩子方法
 public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

//Single
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}


关键方法
 @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed()); //产生订阅 然后丢弃  是个枚举类
        observer.onSuccess(value); //成功  ,没有失败的回调因为不可能失败 因为是一个可用对象
    }




操作符: map,转换对象类型

 val single: Single<Int> = Single.just(1)
        val singleString = single.map(object : Function<Int,String>{
            override fun apply(t: Int): String {
                return t.toString()
            }

        })

多个single map

按时间发送数据: 从0开始发送,间隔1秒

 Observable.interval(0,1,TimeUnit.SECONDS)
            .subscribe(object : Observer<Long>{
                override fun onSubscribe(d: Disposable?) {
                    TODO("Not yet implemented")
                }

                override fun onNext(t: Long?) {
                   clv1.text= t!!.toString()
                }

                override fun onError(e: Throwable?) {
                    TODO("Not yet implemented")
                }

                override fun onComplete() {
                    TODO("Not yet implemented")
                }

            })
 public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit) {
        return interval(initialDelay, period, unit, Schedulers.computation());
    }

interval

/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is); //传入

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }

        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
}







===
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;

public final class ObservableInterval extends Observable<Long> {
    final Scheduler scheduler;
    final long initialDelay;
    final long period;
    final TimeUnit unit;

    public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
        this.initialDelay = initialDelay;
        this.period = period;
        this.unit = unit;
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(Observer<? super Long> observer) {
        IntervalObserver is = new IntervalObserver(observer);
        observer.onSubscribe(is);

        Scheduler sch = scheduler;

        if (sch instanceof TrampolineScheduler) {
            Worker worker = sch.createWorker();
            is.setResource(worker);
            worker.schedulePeriodically(is, initialDelay, period, unit);
        } else {
            Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
            is.setResource(d);
        }
    }

//实现了Disposable 然后extends AtomicReference引用,线程安全的Disposable
    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {

        private static final long serialVersionUID = 346773832286157679L;

        final Observer<? super Long> downstream;

        long count;

        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this); //调用内部的dis
        }

        @Override
        public boolean isDisposed() {
            return get() == DisposableHelper.DISPOSED;
        }

        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }

        public void setResource(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
}

 // dispose
 public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get(); //拿到内部的
        Disposable d = DISPOSED;
        if (current != d) { //如果已经被取消
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

//setResource  把内部值设为传入的值 ,只设置一次
    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        Objects.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

实际设置定时任务的代码: ObservableInterval->subscribeActual

Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
@Override
public void run() {
    if (get() != DisposableHelper.DISPOSED) {
        downstream.onNext(count++);
    }
}
ObservableInterval 内部维护了一个IntervalObserver创建和取消

SingleMap.delay

public final Single<T> delay(long time, @NonNull TimeUnit unit) {
    return delay(time, unit, Schedulers.computation(), false);
}
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {

        final SequentialDisposable sd = new SequentialDisposable(); //内部维护
        observer.onSubscribe(sd); //下游的sd
        source.subscribe(new Delay(sd, observer)); //传递到上游
    }

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> downstream;

        Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
            this.sd = sd;
            this.downstream = observer;
        }

        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); 
//线程调度,进行置换,置换为本地,不再是上游
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;
            }

            @Override
            public void run() {
                downstream.onSuccess(value);
            }
        }

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;
            }

            @Override
            public void run() {
                downstream.onError(e);
            }
        }
    }
}




/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.disposables;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.disposables.Disposable;

/**
 * A Disposable container that allows updating/replacing a Disposable
 * atomically and with respect of disposing the container itself.
 * <p>
 * The class extends AtomicReference directly so watch out for the API leak!
 * @since 2.0
 */
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {

    private static final long serialVersionUID = -754898800686245608L;

    /**
     * Constructs an empty SequentialDisposable.
     */
    public SequentialDisposable() {
        // nothing to do
    }

    /**
     * Construct a SequentialDisposable with the initial Disposable provided.
     * @param initial the initial disposable, null allowed
     */
    public SequentialDisposable(Disposable initial) {
        lazySet(initial);
    }

    /**
     * Atomically: set the next disposable on this container and dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #replace(Disposable)
     */
    public boolean update(Disposable next) {
        return DisposableHelper.set(this, next);
    }

    /**
     * Atomically: set the next disposable on this container but don't dispose the previous
     * one (if any) or dispose next if the container has been disposed.
     * @param next the Disposable to set, may be null
     * @return true if the operation succeeded, false if the container has been disposed
     * @see #update(Disposable)
     */
    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
}

ObservableMap
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;

import java.util.Objects;

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
//通过上游调用
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Throwable {
            T t = qd.poll();
            return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

//上游
   public final void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {

            this.upstream = d;
//有上下游 且可中断的
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;
            }

            if (beforeDownstream()) {

                downstream.onSubscribe(this);

                afterDownstream();
            }

        }
    }

//dispose 上游dispose
 @Override
    public void dispose() {
        upstream.dispose();
    }


ObservableDelay

/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;

public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {
    final long delay;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        super(source);
        this.delay = delay;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super T> t) {
        Observer<T> observer;
        if (delayError) {
            observer = (Observer<T>)t; //
        } else {
            observer = new SerializedObserver<>(t);
        }

        Scheduler.Worker w = scheduler.createWorker();//线程调度器

        source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
//上游的DelayObserver
    }

    static final class DelayObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        final long delay;
        final TimeUnit unit;
        final Scheduler.Worker w;
        final boolean delayError;

        Disposable upstream;

        DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {
            super();
            this.downstream = actual;
            this.delay = delay;
            this.unit = unit;
            this.w = w;
            this.delayError = delayError;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(final T t) {
            w.schedule(new OnNext(t), delay, unit);
        }

        @Override
        public void onError(final Throwable t) {
            w.schedule(new OnError(t), delayError ? delay : 0, unit);//local延时
        }

        @Override
        public void onComplete() {
            w.schedule(new OnComplete(), delay, unit);
        }

        @Override
        public void dispose() {
            upstream.dispose();
            w.dispose();
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }

        final class OnNext implements Runnable {
            private final T t;

            OnNext(T t) {
                this.t = t;
            }

            @Override
            public void run() {
                downstream.onNext(t);
            }
        }

        final class OnError implements Runnable {
            private final Throwable throwable;

            OnError(Throwable throwable) {
                this.throwable = throwable;
            }

            @Override
            public void run() {
                try {
                    downstream.onError(throwable);
                } finally {
                    w.dispose();
                }
            }
        }

        final class OnComplete implements Runnable {
            @Override
            public void run() {
                try {
                    downstream.onComplete();
                } finally {
                    w.dispose();
                }
            }
        }
    }
}



关联上下游 先验证 然后赋值
  public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                downstream.onSubscribe(this); //启动下游subsc
            }
        }




主要是看生产者有没有上游,有没有自己生产的 1 有上游,2自己生产的,

dispose 取消也是这个原则

线程切换:

subscribeOn
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent); //runnable,切线程

        parent.task.replace(f);//切换线程

    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
//d =上游传递的Disposable 
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            downstream.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            downstream.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);//上游取消切换
            task.dispose(); //内部取消
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);//对上游进行订阅
        }
    }

}

SingleObserveOn:
/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
//调用subscribe,第一时间不切换线程
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual; 
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) //赋值下游 
{
                downstream.onSubscribe(this); //调用下游的Disposable 
//取消的时候 下游也取消
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);//置换切线程的任务,发消息前取消上游,然后切线程任务 
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);//取消的时候 下游也取消
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}




/*
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this); //切线程
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

切线程work

   @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();  //createWorker 1

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子方法

        DisposeTask task = new DisposeTask(decoratedRun, w);//

        w.schedule(task, delay, unit);

        return task;
    }


1
  @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory); //threadFactory 2
    }

2 
public interface ScheduledExecutorService extends ExecutorService {

}



//本质通过线程池创建管理   Executors
 public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
        exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
        return exec;
    }

切换主线程 mainThread通过  :Looper.getMainLooper()

static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
//发消息到主线程 handler =  new Handler(looper); loop = mainLoop
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1192258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

能源监测管理系统有哪些作用与效果?

随着全球能源的不断增加&#xff0c;能源的有限性与环境问题日益严重&#xff0c;用能管理企业需要一种高效的方法来管理能源与利用能源&#xff0c;因此能源监测管理系统成为了一种不可或缺的工具。 能源监测管理系统的重要性 1、实现节能减排的目标 通过系统&#xff0c;可…

优思学院|CTP和CTQ是什么?有什么区别?

CTQ 关键质量特性 CTQ是在六西格玛管理中常用的重要词汇&#xff0c;所以很多不同界别的人仕都可能听过&#xff0c;CTQ的意思是关键质量特性&#xff0c;Critical To Quality 的缩写。 六西格玛管理提倡的方法是通过客户的声音 (Voice of customer-VOC) &#xff0c;然后把它…

【C语言】嵌套结构体初始化 - 一个有趣的结论

0. 前言 A. 嵌套结构体&#xff08;比如双链表&#xff09;的初始化一般是什么流程&#xff1f; B. 嵌套结构体的内存是如何分布的&#xff1f; C. 结构体中的结构体指针是否需要再次分配内存&#xff1f;不分配会怎么样&#xff1f; 关于嵌套结构体的初始化问题&#xff0c;我…

Element-Plus表单label实现两端对齐(左右对齐)

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 在使用Element-Plus的form的时候,label只有左右,居中对齐&#xff0c;缺少两端对齐的选项 故研究一下如何实现&#xff0c;其他方法也试过&#xff0c;都没效果&#xff0c;我在别人的基础上又研究了一…

【深度学习】深度学习下的语音识别

语音识别正在侵入我们的生活。它内置于我们的手机、游戏机和智能手表中。它甚至使我们的房屋自动化。你只需 50 美元&#xff0c;你就可以获得一个 Amazon Echo Dot——一个神奇盒子&#xff0c;你只需大声说出你的需求就可以帮你订购披萨、获取天气预报甚至购买垃圾袋。 但是…

【机器学习】给大家推荐几个资源

我写博客的目的就是让大家了解人工智能背后的数学原理&#xff0c;但人工智能这个话题太大了&#xff0c;背后涉及到的知识非常庞大&#xff0c;仅靠写几篇文章传播力度有限&#xff0c;况且知识传播过程中也容易引入误解&#xff0c;所以授之以鱼不如授之以渔&#xff0c;这里…

LiveMedia视频监控汇聚管理平台方案(三)

上一篇文章中我们介绍了LiveMedia视频监控汇聚管理平台视频接入方案中功能设计的设备接入方法。在这篇文章中我们来介绍下LiveMedia视频监控汇聚管理平台方案中功能设计里的流媒体转发是如何实现的&#xff1f; 图1流媒体转发框架 平台流媒体转发框架如图1流媒体转发框架所示&a…

AutoGluon:亚马逊自动机器学习工具,初学者的福音

如果一个机器学习初学者&#xff0c;仅用三行代码就训练了一个模型&#xff0c;并且模型的性能要比从业数十年的都要好&#xff0c;这是一种什么样的感觉&#xff1f; AutoGluon就能帮你梦想成真。 上面这张图片就是AutoGluon的工作流&#xff0c;多么简单啊&#xff01;根据数…

网站接口测试记录

1.被测试服务器端口输入htop指令进行cpu监控 2.测试机器安装宝塔-》我的工具-》进行网站测试 访问地址&#xff1a;https://www.bt.cn/bbs/thread-52772-1-1.html

【FGPA】Verilog:移位寄存器 | 环形计数器 | 4bit移位寄存器的实现 | 4bit环形计数器的实现

目录 Ⅰ. 理论部分 0x00 移位寄存器&#xff08;Shift Register&#xff09; 0x01 环形计数器&#xff08;Ring Counter&#xff09; Ⅱ. 实践部分 0x00 移位寄存器&#xff08;4-bit&#xff09; 0x01 四位环形寄存器&#xff08;4-bit&#xff09; Ⅰ. 理论部分 0x00 …

springboot+vue健美操评分系统的设计与实现【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

YOLOv8-Seg改进: 分割小目标系列篇 | SPD-Conv,提升分割小目标和弱小分割图精度

🚀🚀🚀本文改进:SPD-Conv由一个空间到深度(SPD)层和一个无卷积步长(Conv)层组成,可以应用于大多数CNN体系结构,特别是在处理低分辨率图像和分割小目标等更困难的任务时。 🚀🚀🚀SPD-Conv 分割小目标检测首选,暴力涨点 🚀🚀🚀YOLOv8-seg创新专栏:http:…

剑指JUC原理-16.读写锁

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&…

炮炮面试——简历中专业技能

自我介绍: 面试官,您好! 我叫XXX,今年25岁,目前是XXX大学电子信息学院研三的一名学生,面试岗位是前端研发工程师。下面我将从以下几个方面来介绍自己: (学习能力)在校期间通过了英语四六级、以及一些计算机专业证书,成绩一直是专业前5%,每年都获得学…

Elastic Stack 8.11:引入一种新的强大查询语言 ES|QL

作者&#xff1a;Tyler Perkins, Ninoslav Miskovic, Gilad Gal, Teresa Soler, Shani Sagiv, Jason Burns Elastic Stack 8.11 引入了数据流生命周期、一种配置数据流保留和降采样&#xff08;downsampling&#xff09; 的简单方法&#xff08;技术预览版&#xff09;&#xf…

LiveMedia视频监控汇聚管理平台视频接入方案(二)

上一篇文章中我们介绍了LiveMedia视频监控汇聚管理平台技术方案的架构。今天我们来介绍下LiveMedia视频监控汇聚管理平台的视频接入方案。 视频集控平台建设充分考虑利旧的建设原则&#xff0c;同时根据各个现有视频监控建设情况&#xff0c;考虑统一规划、分布实施的建设方式。…

电脑怎么录制视频,录制的视频怎么剪辑?

在现今数字化的时代&#xff0c;视频成为了人们日常生活中不可或缺的一部分。因此&#xff0c;对于一些需要制作视频教程、录制游戏或者是进行视频演示的人来说&#xff0c;电脑录屏已经成为了一个必不可少的工具。那么&#xff0c;对于这些人来说&#xff0c;如何选择一个好用…

Mysql视图应用

现在&#xff0c;我们将创建一个视图&#xff0c;将员工的姓名、部门和工资信息组合在一起。 CREATE VIEW EmployeeSalaryView AS SELECT e.FirstName, e.LastName, e.Department, s.MonthlySalary FROM Employees e JOIN Salary s ON e.EmployeeID s.EmployeeID;通过这个视图…

MySQL大表数据导入到MongoDB

修改参数 &#xff0c;开启into outfile的功能 secure_file_priv/home/backups/mysql_outfile 重启数据库是参数生效 按条件导出MySQL数据 select * from receipt_receive_log where gmt_create > 2020-04-13 00:00:00 and gmt_create< 2020-07-13 00:00:00 INTO O…

Azure 机器学习 - 有关为 Azure 机器学习配置 Kubernetes 群集的参考

目录 受支持的 Kubernetes 版本和区域建议的资源计划ARO 或 OCP 群集的先决条件禁用安全增强型 Linux (SELinux)ARO 和 OCP 的特权设置 收集的日志详细信息Azure 机器学习作业与自定义数据存储连接支持的 Azure 机器学习排斥和容许最佳实践 通过 HTTP 或 HTTPS 将其他入口控制器…