本文主要以源码形式解读OkHttp内部实现,源码基于okhttp:3.10.0。

同步请求

异步请求的例子

先看一个异步请求的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//(1) builder模式配置参数构建request对象
Request request = new Request.Builder()
.url("http://baidu.com?key=values")
.get()
.build();
//(2)builder构建OkHttpClient对象
new OkHttpClient.Builder().build()
.newCall(request)//(3)request入参返回RealCall
.enqueue(new okhttp3.Callback() {//(4)请求回调
@Override
public void onFailure(Call call, IOException e) {
System.out.println(e.getMessage());
}

@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("thread:"+Thread.currentThread());
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
data.setText(responseBody.string());
}

}
});

上面是一个OkHttp异步请求的代码,先构建一个Request对象设置请求地址、请求方式、header以及非GET请求还可设置body,然后创建OkHttpClient对象调用newCall设置request对象得到RealCall,RealCall调用enqueue发起异步请求并设置请求回调完成了一个简单的异步请求,OkHttpClient在实际开发中需要单例,原因会在后面的内容中有答案。

(1)接下来先看下Request内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* An HTTP request. Instances of this class are immutable if their {@link #body} is null or itself
* immutable.
*/
public final class Request {
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
final Object tag;

private volatile CacheControl cacheControl; // Lazily initialized.

Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tag = builder.tag != null ? builder.tag : this;
}
/**
* Attaches {@code tag} to the request. It can be used later to cancel the request. If the tag
* is unspecified or null, the request is canceled by using the request itself as the tag.
*/
public Builder tag(Object tag) {
this.tag = tag;
return this;
}

public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
}
}

Request包含请求的参数url请求地址、method请求方法、header请求头数据、请求body以及tag标签。

(2)OkHttpClient主要暴露给外部调用,OkHttpClient对象的创建也通过builder模式,这里主要关注它的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;//(1)分发器
this.proxy = builder.proxy;//(2)代理类
this.protocols = builder.protocols;//(3)协议类
this.connectionSpecs = builder.connectionSpecs;//(4)连接规模 确定TLS版本和密码套件
this.interceptors = Util.immutableList(builder.interceptors);//(5)自定义应用拦截器
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);//(6)自定义网络拦截器
this.eventListenerFactory = builder.eventListenerFactory;//(7)事件监听工厂
this.proxySelector = builder.proxySelector;//(8)代理选择器
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;//(9)缓冲类
this.internalCache = builder.internalCache;//(10)
this.socketFactory = builder.socketFactory;//(11)socket工厂

boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}

if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}

this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;//(12)连接池
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
}

(12)OkHttpClient中创建了连接池,还维护了线程池(dispatcher中)和响应缓冲,所以在使用过程中要用单例。

(3)接着调用了OkHttpClient.newCall(req):

1
2
3
4
5
6
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

newCall方法内部调用RealCall.newRealCall方法并返回Call对象:

1
2
3
4
5
6
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);//(1)
call.eventListener = client.eventListenerFactory().create(call);//(2)
return call;
}

(1)创建RealCall对象;RealCall对象持有OkHttpClient和Request,构造方法中还创建了重试/重定向拦截器RetryAndFollowInterceptor;
(2)从OkHttpClient得到evenListener对象,

(4)调用异步请求RealCall.enqueue方法:

1
2
3
4
5
6
7
8
9
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");(1
executed = true;
}
captureCallStackTrace();(2
eventListener.callStart(this);(3
client.dispatcher().enqueue(new AsyncCall(responseCallback));//(4)
}

(1)如果call已经被执行,抛出异常;
(2) 捕获RealCall类的栈轨迹;
(3)触发监听方法callStart(),表示请求开始;
(4)执行dispatcher分发器enqueue方法,创建了AsyncCall类,AsyncCall传入responseCallback。

下面查看dispatcher的enqueue方法:

1
2
3
4
5
6
7
8
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//(1)
runningAsyncCalls.add(call);//(2)
executorService().execute(call);//(3)
} else {
readyAsyncCalls.add(call);//(4)
}
}

(1)把call添加到正在运行的队列的判断依据:如果正在执行的异步请求数小于最大请求数(默认64),并且同一个主机执行的异步请求小于单个主机运行的最大请求数(默认5)否则添加到准备队列;
(2)根据(1)把call添加到正在运行的队列;
(3)将call交线程池执行;
(4)不满足(1)把call添加到准备队列。
AsyncCall是Runnable实现类,execute方法完成请求和返回的执行。
AsyncCall. execute():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//(1)
if (retryAndFollowUpInterceptor.isCanceled()) {//(2)
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);//(3)
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);//(4)
responseCallback.onFailure(RealCall.this, e);//(5)
}
} finally {
client.dispatcher().finished(this);//(6)
}
}

(1)getResponseWithInterceptorChain方法得到response对象,getResponseWithInterceptorChain是核心实现,后边专门展开说明;
(2)请求如果取消返回,调用responseCallback.onFailure通知处理请求失败;
(3)否则正常请求返回,调用responseCallback.onResponse返回response对象,调用方就可以拿到请求的数据返回,做具体业务处理;
(4)(5)回调执行eventListener.callFailed和responseCallback.onFailure;
(6)dispatcher执行finished方法,finish内部会调用promoteCalls方法从readyAsyncCalls队列中取出call 添加到runningAsyncCalls中,executorService().execute(call)加入线程池中执行call。添加到runningAsyncCalls中的条件是小于运行runningAsyncCalls最大call数并且同一主机call数小于maxRequestsPerHost(即同一主机最大请求数)。

  • 接下来分析核心方法getResponseWithInterceptorChain():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());//(1)
interceptors.add(retryAndFollowUpInterceptor);//(2)
interceptors.add(new BridgeInterceptor(client.cookieJar()));//(3)
interceptors.add(new CacheInterceptor(client.internalCache()));//(4)
interceptors.add(new ConnectInterceptor(client));//(5)
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());//(6)
}
interceptors.add(new CallServerInterceptor(forWebSocket));//(7)

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);//(8)
}

(1)构建全部拦截器list,先添加自定义应用层拦截器;
(2)添加重试/重定向拦截器;
(3)添加桥接拦截器;
(4)添加缓冲拦截器;
(5)添加连接拦截器;
(6)如果不是websocket,添加自定义网络拦截器;
(7)添加请求服务拦截器;
(8)传入拦截器list,请求,call对象,事件监听,连接超时时间以及读写超时时间生成Interceptor.Chain链对象,执行chain.proceed(originalRequest)。

chain.proceed(originalRequest):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);//(1)

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}

(1)创建下一个RealInterceptorChain,将chain传入interceptor.intercept方法,intercept中会执行nextchain.proceed方法,然后再继续创建下一个RealInterceptorChain,intercept再执行下一个nextchain.proceed方法,这样循环调用所有拦截器,到最后一个拦截器CallServerInterceptor停止遍历,返回response,遍历循环流程如下图:

接下来阅读自带的拦截器代码:

  • RetryAndFollowUpInterceptor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);//(1)
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();//(2)
throw new IOException("Canceled");
}

Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);//(3)
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();//(4)
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;//(5)
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {//(6)
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {//(7)
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp = followUpRequest(response, streamAllocation.route());//(8)

if (followUp == null) {//(9)
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}

closeQuietly(response.body());//(10)

if (++followUpCount > MAX_FOLLOW_UPS) {//(11)
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {//(12)
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}

if (!sameConnection(response, followUp.url())) {//(13)
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {//(14)
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;//(15)
priorResponse = response;//(16)
}
}

(1)StreamAllocation用来协调连接(Connections)、流(Screams)和请求队列(Calls);
(2)如果请求取消,调用screamAllocation.release()。release方法会关闭socket,并回调 eventListener.connectionReleased。
(3)执行realChain.proceed方法,proceed内部会创建下一个chain,再传入下一个拦截器,拦截器intercept继续执行负责的工作,并调用chain.proceed()。
(4)如果realChain.proceed方法抛出RouteException,那么调用recover方法,recover方法返回false则不会重试连接,抛出IOException异常,异常会在Call.execute方法中捕获执行eventListener.callFailed和onFailure方法,返回false的条件如下:

1.应用层禁止重试 ;
2.定义了不可重复发送的请求body ;
3.捕获的异常严重等级属于致命 ;
4.没有更多的路由可重意重试;

如果上述的四种场景,请求会被发起重试。

(5)IOException,同样调用recover方法,按照(4)中逻辑判断是否重连;
(6)如果抛出没有catch的异常则执行StreamAllocation.screamFailed()和StreamAllocation.screamFailed;
(7)priorResponse是先前得到的响应数据,如果已经先前响应不为空,response会结合先前响应;
(8)根据响应码确认请求是否需要重定向,返回null表示不需要;
(9)不需要重定向就streamAllocation.release()释放连接并返回response,否则执行下面逻辑;
(10)释放response.body对象;
(11)当前重定向数大于最大可重定向数,则释放连接,抛出异常;
(12)请求不允许重复连接,则释放连接,抛出异常;
(13)检查是否是相同的连接,不是就释放当前连接,重新创建ScreamAllocation;
(14)codec为空抛出异常;
(15)重定向request赋值request,准备执行while循环;
(16)保存当前的response。

  • BridgeInterceptor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());//(1)
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

桥接拦截器主要功能:
1.将应用码转为网络码;
2.用户请求转为网络请求

  • CacheInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

//(1)
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {//(2)
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {//(3)
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
if (networkRequest == null) {//(4)
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());//(5)
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {//(6)
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

//(7)
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);//(8)
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

(1)获取缓冲策略;
(2)缓冲策略不为空,缓冲响应为空时,关闭缓冲策略;
(3)网络被禁止,缓冲不存在时,返回失败;
(4)不需要网络,返回缓冲响应,缓冲生效;
(5)执行chain.proceed方法抛出异常时,关闭缓冲;
(6)有缓冲时根据条件使用缓冲响应;
(7)使用网络响应;
(8)给予本请求缓冲(添加到缓冲中);

  • ConnectInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();//(1)

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

(1)获得RealConnection对象,调用下一个chain.proceed。

  • CallServerInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
 @Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();

realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);//(1)
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}

if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

request.body().writeTo(bufferedRequestBody);//(2)
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}

httpCodec.finishRequest();

if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);//(3)
}

Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);

response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);

//(4)
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

(1)写入请求头数据;
(2)写入请求body数据;
(3)读取响应头数据;
(4)读取响应body数据。

完整异步请求调用流程: