newCall 实际上是创建了一个 RealCall 有三个参数:OkHttpClient(通用配置,超时时间等) Request(Http请求所用到的条件,url等) 布尔变量forWebSocket(webSocket是一种应用层的交互方式,可双向交互,一般用不到,除非需要频繁刷新数据,股票等。)
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
}
Request会被进行多次封装(所以构造函数里对象被命名为originRequest)
在进行newCall().enqueue()
,实际就是RealCall
的enqueue()
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//1
transmitter.callStart();
//2 关键
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
分别看1和2,主要看2
public void callStart() {
//跟踪程序错误
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
//eventListener是一个监听器,连接的接入和关闭,对程序进行监听
eventListener.callStart(call);
}
client.dispatcher()
返回一个Dispatcher
类对象,即线程调度器,然后用这个去进行异步操作,代入参数为一个AsyncCall
void enqueue(AsyncCall call) {
synchronized (this) {
//1
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
//2
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//3
promoteAndExecute();
}
- 1的
readyAsyncCalls
是一个ArrayDeque<AsyncCall>
,存放 准备要执行但还没有执行,然后会在3的promoteAndExecute()
中执行
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
promoteAndExecute
会挑选那些不会导致超负载的call(不超过AsyncCall对应的maxRequest),放进executableCalls
和runningAsyncCalls
,然后去执行,就是去遍历executableCalls
然后执行。
分别执行就是把调用每一个asyncCall 的 executeOn()
:
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
核心只有一行:
executorService.execute(this);
这里就已经切换线程了,执行的都是传入的 executorService.对象 的execute()
方法,都会在后台执行
@Override protected void execute() {
boolean signalledCallback = false; // 标记回调是否已触发
transmitter.timeoutEnter(); // 进入超时处理逻辑
try {
Response response = getResponseWithInterceptorChain(); // 调用拦截器链获取response
signalledCallback = true; // 标记回调已触发
responseCallback.onResponse(RealCall.this, response); // 调用response的回调函数
} catch (IOException e) {
if (signalledCallback) {
// 不要重复触发回调!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel(); // 取消请求
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException); // 调用失败回调函数
}
throw t;
} finally {
client.dispatcher().finished(this); // 请求结束,将请求移出调度队列
}
}
其中回调函数就是当初我们在应用层所定义的Callback里边定义的onFailure()
和 onResponse()
,然后如果出现异常,会进行调用相应的方法。