Dispatchers维护着一个线程池,3个双端队列,准备执行的AsynCall,正在执行的AsynCall,正在执行的同步Call(RealCall)。
同时规定每个Host最多同时请求5个Request,同时可最多执行64个Request。
public final class Dispatcher {
private int maxRequests = 64;// 同时执行的最大请求数64
private int maxRequestsPerHost = 5;// 同一个host最大可同时执行的Request个数
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
}
Dispatcher的线程池
线程池回忆
先回忆一波线程池的运行:
- 核心线程池:一开始会根据task创建最多核心线程池个线程,超过核心线程数的task会被放入阻塞队列。如果设置了allowCoreThreadTimeout=true,则核心线程池在获取task执行超过keepAliveTime时间,线程数会减少,否则核心线程数个线程会一直存在,阻塞等待workQueue中来task。
- 最大线程池:如果task超过阻塞队列,则会开始创建线程,直到最大线程池个线程。
- keepAliveTime:控制获取task的时间,超过这个时间还获取不到task执行,将会减少线程数目。所以变相等于线程存活的时间。
- workQueue(BlockingQueue):workQueue的类型决定了当
- ThreadFactory:创建线程,决定线程名的策略。
Dispatcher默认的线程池规定
public synchronized ExecutorService executorService() {
if (executorService == null) {
// keepAliveTime为60s。线程最多等待60s,没有任务则销毁。
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- 没有核心线程,并且最大线程数为MAX_VALUE,并且使用的阻塞队列为SynchronousQueue,即有task提交的时候,如果有空闲线程,则使用空闲线程,否则新创建一个线程来执行task。并且线程在等待blockingqueue.take方法keepAliveTime,即这里的60s之后,没有task之后,线程将会被销毁。
setMaxRequests
maxRequests指的是同时运行的最大Requests的个数。
设置maxRequests的同时,会从readyAsyncCalls中取call放入runningAsyncCalls和放入线程池中执行。直到正在执行的runningAsynCall的个数>=maxRequests。
取出的call要满足<maxRequestPerHost。
一个host默认最多同时发起5个Request,总共可同时发起64个Request。
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 如果call的host正在执行的Requests个数<maxRequestPerHost,就将这个call加入执行队列
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
// 获取正在执行的异步call的host与传入的call host相同的个数,即发起call的host已经有多少个Request在执行
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
这个promoteCalls方法主要作用就是如果检查到runningAsyncCalls中有空缺(<64),就从readyAsyncCalls中取出一个Call丢到runningAsyncCalls,和丢到线程池中去执行。
这个方法除了会被setMaxRequests调用,还会在AsyncCall执行完成之后调用的finshed调用(这部分后面会讲到)。
enqueue
enqueue接收的参数是AsyncCall。AsyncCall实质是一个Runnable,实现了Runnable接口。
enqueue方法主要检查是否可以让AsyncCall直接执行。如果runningSyncCalls队列还有空位(runningAsyncCalls.size < maxRequests),并且AsyncCall所属的host还没有满员,就可以直接执行。
否则,加入readyAsyncCalls队列中。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
executed
// 这里executed只是将Call加入同步队列,但是在RealCall方法里的execute会直接调用getResponse方法,那里才是真正获取Response的地方
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);//
}
executed方法接收的参数是一个RealCall。由于executed方法代表的是同步执行,所以这里只是将Call加入了队列。具体的执行会在RealCall中被调用(详情看后面的RealCall篇介绍)。
setIdleCallback
设置当dispatchers空闲时候的回调,即当runningSyncCalls和runningAsyncCalls都为空时调用。
/**
* Set a callback to be invoked each time the dispatcher becomes idle (when the number of running
* calls returns to zero).
*
* <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending
* on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or
* {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the
* {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has
* returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This
* means that if you are doing synchronous calls the network layer will not truly be idle until
* every returned {@link Response} has been closed.
*/
public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
this.idleCallback = idleCallback;
}
触发时机是finished方法,finished方法会在RealCall / AsyncCall执行完成的时候调用。
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();// runningSyncCalls.size + runningAsynCalls.size
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
总结
- Dispatcher管理着一个线程池,3个calls,分别是同步正在执行的calls:runningSyncCalls,异步正在执行的calls:runningAsyncCalls,正在排队的异步calls,readyAsyncCalls。没有正在排队的同步calls,因为同步calls都是马上执行的(唯一有一个问题,没有看到同步call到底在哪里执行,异步call好歹调用了excutorService.execute了)。
- Dispatchers的线程池配置为:没有核心线程池,最大线程数为MAX_VALUE,同步队列为synchronousQueue,keepAliveTime为60s,这代表,只要有task,如果有空闲线程,就用空闲线程执行,否则会创建新的线程去执行task。当线程获取task超过60s,还没有task,这个线程会被销毁。
- Dispatchers还另外管理着两个重要的参数,maxRequests:代表同时执行的最大请求数,maxRequestsPerHost:一个Host最多同时请求的数目,设置maxRequestsPerHost是为了资源的均匀利用,避免一个host发过多请求,阻塞其他的host。
- Dispatchers提供自动补全maxRequests个request的功能,当调用setMaxRequests或finished方法的时候,如果runningAsyncCalls的个数不足maxRequests,会自动补全。
Dispatcher.finished方法,会在AsyncCall执行完成之后被调用。通过这个就可以实现,当线程池中任务执行完后,Dispatcher会及时将Runnable丢到线程池去执行。
Dispatcher是管理Runnable(AsyncCall)的,线程池也是管理Runnable的,那为什么Dispatcher不干脆来一个Call,就丢一个Call进线程池执行呢?反正线程池自己也会调用Runnable的执行。
因为Dispatcher想要控制一个host最多一次发5个Request,做到资源的均匀利用,才会在线程池外部又自己做了一套Runnable的管理。