[TOC]

OkHttp

基于Socket, 和HttpUrlConnection同级但并不属于HttpUrlConnection(断点续传的关键)

目前相对主流的框架Retrofit,Glide中都是内置了OkHttp

构造函数

public Builder() {
dispatcher = new Dispatcher();// 调度者
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;// 传输层版本和连接协议
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。
certificatePinner = CertificatePinner.DEFAULT;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。
proxyAuthenticator = Authenticator.NONE;// 代理身份验证
authenticator = Authenticator.NONE;// 身份验证
connectionPool = new ConnectionPool();// 连接池,最大连接数量为5,空闲时间上限为5分钟;
dns = Dns.SYSTEM;
followSslRedirects = true;// 安全套接层重定向
followRedirects = true;// 本地重定向
retryOnConnectionFailure = true;// 重试连接失败
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}

获取拦截器链

添加各种拦截器,然后建立拦截器链,来处理对请求对象的拦截操作。

Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}

拦截器

关键类为RealInterceptorChain。

一个具体的拦截器链,包含整个拦截器链:所有应用程序拦截器,OkHttp核心,所有网络拦截器,最后是网络调用者。如果该链用于应用程序拦截器,则connection必须为空。如果它是用于网络拦截器的,则connection必须为非空。

@Override public @Nullable Connection connection() {
return exchange != null ? exchange.connection() : null;
}

核心部分

包括常见异常日志打印;拦截器的链式调用;责任链模式;

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// 调用链中的下一个拦截器。
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}

异步请求

Request request = new Request.Builder()
.url(url)
.build();

client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();
}

@Override
public void onResponse(Call call, Response response) throws IOException {
...
}
)

源码分析

void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call); //准备好异步调用的运行顺序。

//对AsyncCall进行突变,以使其将现有正在运行的调用的AtomicInteger共享给同一主机。
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}

相关异步回调

//准备好异步调用的运行顺序。
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

//运行异步调用。包括尚未结束的已取消呼叫。
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

//运行同步呼叫。包括尚未结束的已取消呼叫。
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

promoteAndExecute

将合格的回调从readyAsyncCalls更新到runningAsyncCalls,并在执行程序服务上运行它们。

不能以同步方式调用,因为执行调用可以调用用户代码。

@return如果调度程序当前正在运行回调,则为true。

private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();

if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}