classRequestinternalconstructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internalval tags: Map<Class<*>, Any> ) { val cacheControl: CacheControl get() { var result = lazyCacheControl if (result == null) { result = CacheControl.parse(headers) lazyCacheControl = result } return result } val isHttps: Boolean get() = url.isHttps }
Request的创建不支持直接new,由于需要的配置比较多,所以采用了Builder设计模式
1 2 3 4 5 6 7 8 9 10
Request.Builder() .url("url地址") .addHeader(name = "key", value = "value") .method(method = "get", body = null) .tag(type = String::class.java, tag = "tag") .cacheControl( CacheControl.Builder() .noCache() .build() )
openfunremoveHeader(name: String) = apply { headers.removeAll(name) } // 移除所有k匹配的k-v键值对 funremoveAll(name: String) = apply { var i = 0 while (i < namesAndValues.size) { if (name.equals(namesAndValues[i], ignoreCase = true)) { namesAndValues.removeAt(i) // name namesAndValues.removeAt(i) // value i -= 2 } i += 2 } }
/** Returns a new request body that transmits the content of this. */ @JvmStatic @JvmName("create") fun File.asRequestBody(contentType: MediaType? = null): RequestBody { returnobject : RequestBody() { overridefuncontentType() = contentType
classBuilder@JvmOverloadsconstructor(boundary: String = UUID.randomUUID().toString()) { privateval boundary: ByteString = boundary.encodeUtf8() privatevar type = MIXED privateval parts = mutableListOf<Part>()
/** * Set the MIME type. Expected values for `type` are [MIXED] (the default), [ALTERNATIVE], * [DIGEST], [PARALLEL] and [FORM]. */ funsetType(type: MediaType) = apply { require(type.type == "multipart") { "multipart != $type" } this.type = type }
/** Add a part to the body. */ funaddPart(body: RequestBody) = apply { addPart(Part.create(body)) }
/** Add a part to the body. */ funaddPart(headers: Headers?, body: RequestBody) = apply { addPart(Part.create(headers, body)) }
/** Add a form data part to the body. */ funaddFormDataPart(name: String, value: String) = apply { addPart(Part.createFormData(name, value)) }
/** Add a form data part to the body. */ funaddFormDataPart(name: String, filename: String?, body: RequestBody) = apply { addPart(Part.createFormData(name, filename, body)) }
/** Add a part to the body. */ funaddPart(part: Part) = apply { parts += part }
/** Assemble the specified parts into a request body. */ funbuild(): MultipartBody { check(parts.isNotEmpty()) { "Multipart body must have at least one part." } return MultipartBody(boundary, type, parts.toImmutableList()) } }
classRealResponseBody( /** * Use a string to avoid parsing the content type until needed. This also defers problems caused * by malformed content types. */ privateval contentTypeString: String?, privateval contentLength: Long, privateval source: BufferedSource ) : ResponseBody() {
classRealCall( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean )
overridefunintercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain var request = chain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true) { // 为网络请求创建连接 call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response var closeActiveExchange = true try { //处理取消 if (call.isCanceled()) { throw IOException("Canceled") }
try { // 向前推进 response = realChain.proceed(request) // 时候开启新的连接 newExchangeFinder = true } catch (e: RouteException) { // 尝试恢复连接 if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { //恢复失败抛异常 throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } // 不需要重新建立连接 newExchangeFinder = false continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue }
// Attach the prior response if it exists. Such responses never have a body. // 合并前一个连接 if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build() }
val exchange = call.interceptorScopedExchange // 重定向request val followUp = followUpRequest(response, exchange)
if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response }
val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response }
//客户端超时 408 HTTP_CLIENT_TIMEOUT -> { // 408's are rare in practice, but some servers like HAProxy use this response code. The // spec says that we may repeat the request without modifications. Modern browsers also // repeat the request (even non-idempotent ones.) if (!client.retryOnConnectionFailure) { // The application layer has directed us not to retry the request. returnnull }
val requestBody = userResponse.request.body if (requestBody != null && requestBody.isOneShot()) { returnnull } val priorResponse = userResponse.priorResponse if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) { // We attempted to retry and got another timeout. Give up. returnnull }
if (retryAfter(userResponse, 0) > 0) { returnnull }
return userResponse.request }
//服务不用 HTTP_UNAVAILABLE -> { val priorResponse = userResponse.priorResponse if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) { // We attempted to retry and got another timeout. Give up. returnnull }
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) { // specifically received an instruction to retry without delay return userResponse.request }
returnnull }
//There are too many connections from your internet address 421 HTTP_MISDIRECTED_REQUEST -> { // OkHttp can coalesce HTTP/2 connections even if the domain names are different. See // RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then // we can retry on a different connection. val requestBody = userResponse.request.body if (requestBody != null && requestBody.isOneShot()) { returnnull }
if (exchange == null || !exchange.isCoalescedConnection) { returnnull }
overridefunintercept(chain: Interceptor.Chain): Response { val call = chain.call() // 获取缓存 val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis() // http 缓存策略 val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() // 网络请求 //(如果为null,要么是缓存hit,要么是request only-if-cached但无缓存,如果不为null表明该请求可以用于之后的拦截器) val networkRequest = strategy.networkRequest // 缓存响应(如果为null表示无缓存,不为null表示有命中的缓存) val cacheResponse = strategy.cacheResponse // 记录缓存命中次数,validation次数 cache?.trackResponse(strategy) // 时间监听器 val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE // 关闭缓存body(缓存未命中) if (cacheCandidate != null && cacheResponse == null) { // The cache candidate wasn't applicable. Close it. cacheCandidate.body?.closeQuietly() }
// If we're forbidden from using the network and the cache is insufficient, fail. // 要么是request only-if-cached但无缓存,返回异常报文 if (networkRequest == null && cacheResponse == null) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)") .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build().also { listener.satisfactionFailure(call, it) } }
// If we don't need the network, we're done. // 缓存命中,使用cache,直接返回 if (networkRequest == null) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }
var networkResponse: Response? = null try { // 让后续拦截器进行处理 networkResponse = chain.proceed(networkRequest) } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } }
// If we have a cache response too, then we're doing a conditional get. // 如果在缓存策略中有匹配的缓存。 if (cacheResponse != null) { // validation结果为没变,使用缓存并更新缓存 if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response.also { listener.cacheHit(call, it) } } else { // validation发现资源发生变化,关闭流(以后也用不上了) cacheResponse.body?.closeQuietly() } }
val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() // 缓存非空,刷新缓存 if (cache != null) { // 有body,并且可以缓存 if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. // 将内容放入缓存 val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { // This will log a conditional cache miss only. listener.cacheMiss(call) } } } // 刷新缓存 if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } }
return response }
缓存策略
1
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
1 2 3 4 5 6 7 8 9 10 11
funcompute(): CacheStrategy { val candidate = computeCandidate()
// We're forbidden from using the network and the cache is insufficient. // 如果networkRequest不为空,但request中设置了only-if-cached策略 if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) { return CacheStrategy(null, null) }
privatefuncomputeCandidate(): CacheStrategy { // No cached response. // 如果没有缓存 if (cacheResponse == null) { return CacheStrategy(request, null) }
// Drop the cached response if it's missing a required handshake. // 如果请求式https,握手信息为null if (request.isHttps && cacheResponse.handshake == null) { return CacheStrategy(request, null) }
// If this response shouldn't have been stored, it should never be used as a response source. // This check should be redundant as long as the persistence store is well-behaved and the // rules are constant. // 如果不可以缓存 if (!isCacheable(cacheResponse, request)) { return CacheStrategy(request, null) }
val requestCaching = request.cacheControl // 如果缓存策略式no-cache,或有缓存限定 if (requestCaching.noCache || hasConditions(request)) { return CacheStrategy(request, null) } val responseCaching = cacheResponse.cacheControl // 当前缓存的请求的存活时间 val ageMillis = cacheResponseAge() // 缓存的最大生存时间 var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) { freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong())) }
var minFreshMillis: Long = 0 if (requestCaching.minFreshSeconds != -1) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong()) }
var maxStaleMillis: Long = 0 if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong()) } // 没有设置no-cache,并且 age + minFresh < freshMills + maxStale (及缓存没有完全失效) if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { val builder = cacheResponse.newBuilder() if (ageMillis + minFreshMillis >= freshMillis) { // 缓存已经超过了max-age,但是还在maxStale范围内 builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"") } val oneDayMillis = 24 * 60 * 60 * 1000L if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { // 无限期缓存,并且缓存已经超过一天了 builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"") } return CacheStrategy(null, builder.build()) }
// Find a condition to add to the request. If the condition is satisfied, the response body // will not be transmitted. val conditionName: String val conditionValue: String? when { // 如果有条件判断 etag != null -> { conditionName = "If-None-Match" conditionValue = etag }
// 如果连接可用,直接返回 // Confirm that the connection is good. if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } // 如果连接不可用,不允许进一步运行 // If it isn't, take it out of the pool. candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. // 如果连接不可用,还有尝试的连接,继续寻找 if (nextRouteToTry != null) continue // 如果还存在路由项继续 val routesLeft = routeSelection?.hasNext() ?: true if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true if (routesSelectionLeft) continue
// Attempt to reuse the connection from the call. val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! // 尝试复用连接 if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here // because we already acquired it. if (call.connection != null) { check(toClose == null) return callConnection }
// The call's connection was released. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) }
// We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 // 尝试从连接池中获取 // Attempt to get a connection from the pool. if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
// Nothing in the pool. Figure out what route we'll try next. val routes: List<Route>? val route: Route // 从其他路由项中寻找 if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. routes = null route = nextRouteToTry!! nextRouteToTry = null } elseif (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing. // 加入路由进行连接的复用 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
route = localRouteSelection.next() } // 创建连接 // Connect. Tell the call about the connecting call so async cancels work. // 创建新的连接 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3 // different lookups in the connection pool! // 获取多路复用的连接即http/2连接。 if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { // 获取到了就放弃创建的连接 val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } // 将新创建的连接放入连接池 synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) }
@Throws(IOException::class) overridefunintercept(chain: Interceptor.Chain): Response { // 获取必要参数 val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body // request发送时间 val sentRequestMillis = System.currentTimeMillis() // 写入头 exchange.writeRequestHeaders(request) var invokeStartEvent = true var responseBuilder: Response.Builder? = null // 如果请求类型支持body,并且body不为null,写入body if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true) exchange.responseHeadersStart() invokeStartEvent = false } // 写入body if (responseBuilder == null) { if (requestBody.isDuplex()) { // 如果是request是全双工,不关闭body // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true).buffer() requestBody.writeTo(bufferedRequestBody) } else { // 不是全双工在发送数据后关闭 // Write the request body if the "Expect: 100-continue" expectation was met. val bufferedRequestBody = exchange.createRequestBody(request, false).buffer() requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } else { exchange.noRequestBody() if (!exchange.connection.isMultiplexed) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection() } } } else { // 无body exchange.noRequestBody() } // 如果body为null或者不是全双工,请求完成 if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } // 构建response if (responseBuilder == null) { // 读响应头 responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } // 装入request,握手,发送时间,接受时间。 var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() // 读取响应码 var code = response.code if (code == 100) { // Server sent a 100-continue even though we did not request one. Try again to read the actual // response status. responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() } // 重新构建response response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() code = response.code }
exchange.responseHeadersEnd(response) // websocket协议 response = if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { // 读取body response.newBuilder() .body(exchange.openResponseBody(response)) .build() } // 如果有request或者response中有明确声明Connection: close,不再复用连接 if ("close".equals(response.request.header("Connection"), ignoreCase = true) || "close".equals(response.header("Connection"), ignoreCase = true)) { exchange.noNewExchangesOnConnection() } // 请求码表示无body,但实际有 if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) { throw ProtocolException( "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}") } // 返回response return response } }
override fun intercept(chain: Interceptor.Chain): Response {
// ...... if (cache != null) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. // 存入缓存 valcacheRequest= cache.put(response) // 返回响应体 return cacheWritingResponse(cacheRequest, response).also { if (cacheResponse != null) { // This will log a conditional cache miss only. listener.cacheMiss(call) } } }
if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) { // The cache cannot be written. } } }
overridefunhasNext(): Boolean { if (nextUrl != null) returntrue
canRemove = false// Prevent delegate.remove() on the wrong item! while (delegate.hasNext()) { try { delegate.next().use { snapshot -> val metadata = snapshot.getSource(ENTRY_METADATA).buffer() nextUrl = metadata.readUtf8LineStrict() returntrue } } catch (_: IOException) { // We couldn't read the metadata for this snapshot; possibly because the host filesystem // has disappeared! Skip it. } }
returnfalse }
overridefunnext(): String { if (!hasNext()) throw NoSuchElementException() val result = nextUrl!! nextUrl = null canRemove = true return result }
funinterface Authenticator { /** * Returns a request that includes a credential to satisfy an authentication challenge in * [response]. Returns null if the challenge cannot be satisfied. * * The route is best effort, it currently may not always be provided even when logically * available. It may also not be provided when an authenticator is re-used manually in an * application interceptor, such as when implementing client-specific retries. */ // @Throws(IOException::class) funauthenticate(route: Route?, response: Response): Request?
companionobject { /** An authenticator that knows no credentials and makes no attempt to authenticate. */ // 一个什么都不做的鉴别器 @JvmField val NONE: Authenticator = AuthenticatorNone() privateclassAuthenticatorNone : Authenticator { overridefunauthenticate(route: Route?, response: Response): Request? = null }
/** An authenticator that uses the java.net.Authenticator global authenticator. */ // 基于密码的身份鉴别器 @JvmField val JAVA_NET_AUTHENTICATOR: Authenticator = JavaNetAuthenticator() } }
Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that share the same Address may share a Connection. This class implements the policy of which connections to keep open for future use.
funcleanup(now: Long): Long { var inUseConnectionCount = 0 var idleConnectionCount = 0 var longestIdleConnection: RealConnection? = null var longestIdleDurationNs = Long.MIN_VALUE
// Find either a connection to evict, or the time that the next eviction is due. for (connection in connections) { synchronized(connection) { // If the connection is in use, keep searching. // 获取此连接的call个数,以及空闲时间 if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ // 累计正在使用的连接数 } else { idleConnectionCount++ // 累计空闲连接数
// If the connection is ready to be evicted, we're done. // 计算空闲时间段 val idleDurationNs = now - connection.idleAtNs // 计算空闲的最大时间段 if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection } else { Unit } } } }
when { // 如果空闲的最大时间段大于目前设置的连接存活时间,或者空闲连接数超出最大值 // 立即开启调度 longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. val connection = longestIdleConnection!! synchronized(connection) { // 如果当前连接不再空闲亦或不再是空闲最久的连接 if (connection.calls.isNotEmpty()) return0L// No longer idle. if (connection.idleAtNs + longestIdleDurationNs != now) return0L// No longer oldest. // 如果是空闲连接,并且还是空闲最久的连接,从连接池中释放该连接 connection.noNewExchanges = true connections.remove(longestIdleConnection) } // 关闭socket connection.socket().closeQuietly() if (connections.isEmpty()) cleanupQueue.cancelAll()
// Clean up again immediately. return0L } // 如果有空闲连接 idleConnectionCount > 0 -> { // A connection will be ready to evict soon. // 计算下一次调度时间=存活时间-最长空闲时间段 return keepAliveDurationNs - longestIdleDurationNs } // 如果有存活连接 inUseConnectionCount > 0 -> { // All connections are in use. It'll be at least the keep alive duration 'til we run // again. // 没有空闲连接,下次调度只能等最大生存时间 return keepAliveDurationNs } // 没有连接,不需要调度 else -> { // No connections, idle or in use. return -1 } } }
连接创建
ExchangeFinder.kt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Connect. Tell the call about the connecting call so async cancels work. val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null }
privatefunconnectTls(connectionSpecSelector: ConnectionSpecSelector) { val address = route.address val sslSocketFactory = address.sslSocketFactory var success = false var sslSocket: SSLSocket? = null try { // Create the wrapper over the connected socket. // 创建ssl socket sslSocket = sslSocketFactory!!.createSocket( rawSocket, address.url.host, address.url.port, true/* autoClose */) as SSLSocket
// 配置socket加密套件,tls版本,扩展(alpn) val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket) if (connectionSpec.supportsTlsExtensions) { Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols) }
// 握手 sslSocket.startHandshake() // block for session establishment val sslSocketSession = sslSocket.session val unverifiedHandshake = sslSocketSession.handshake()
// Verify that the socket's certificates are acceptable for the target host. if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) { val peerCertificates = unverifiedHandshake.peerCertificates if (peerCertificates.isNotEmpty()) { val cert = peerCertificates[0] as X509Certificate throw SSLPeerUnverifiedException(""" |Hostname ${address.url.host} not verified: | certificate: ${CertificatePinner.pin(cert)} | DN: ${cert.subjectDN.name} | subjectAltNames: ${OkHostnameVerifier.allSubjectAltNames(cert)} """.trimMargin()) } else { throw SSLPeerUnverifiedException( "Hostname ${address.url.host} not verified (no certificates)") } }
// Check that the certificate pinner is satisfied by the certificates presented. certificatePinner.check(address.url.host) { handshake!!.peerCertificates.map { it as X509Certificate } }
// Success! Save the handshake and the ALPN protocol. val maybeProtocol = if (connectionSpec.supportsTlsExtensions) { Platform.get().getSelectedProtocol(sslSocket) } else { null } socket = sslSocket source = sslSocket.source().buffer() sink = sslSocket.sink().buffer() protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1 success = true } finally { if (sslSocket != null) { Platform.get().afterHandshake(sslSocket) } if (!success) { sslSocket?.closeQuietly() } } }
// Attempt to reuse the connection from the call. // val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { // ..... // 如果call 本身含有connection,并且非空直接返回 if (call.connection != null) { check(toClose == null) return callConnection } // ..... }
// We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0
// 尝试从连接池中获取一个连接 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
// 加入proxy,selector获取连接 val routes: List<Route>? val route: Route if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. routes = null route = nextRouteToTry!! nextRouteToTry = null } elseif (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// 加入proxy等获取连接 if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result }
logElapsed(task, task.queue!!) { var completedNormally = false try { // 执行 runTask(task) completedNormally = true } finally { // If the task is crashing start another thread to service the queues. if (!completedNormally) { backend.execute(this) } } } } } }
// 依据url加载cookie val cookies = cookieJar.loadForRequest(userRequest.url) // 将加载的cookie加入header if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) }
// 交由后续拦截器处理,获得response val networkResponse = chain.proceed(requestBuilder.build()) // 更新cookie cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean // 确保安全 synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() // 如果运行的请求数上限,或者host的请求数超过上限 if (runningAsyncCalls.size >= this.maxRequests) break// Max capacity. if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue// Host max capacity. // 满足运行条件 // 从ready队列移除 i.remove() // 累计host的请求数 asyncCall.callsPerHost.incrementAndGet() // 放入执行队列 executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } // 执行所有的之前筛选出来的asyncCall for (i in0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) }
return isRunning }
1 2 3 4 5 6 7 8 9 10 11 12
// 取消所有call @SynchronizedfuncancelAll() { for (call in readyAsyncCalls) { call.call.cancel() } for (call in runningAsyncCalls) { call.call.cancel() } for (call in runningSyncCalls) { call.cancel() } }
// Try each address for best behavior in mixed IPv4/IPv6 environments. val addresses = address.dns.lookup(socketHost) if (addresses.isEmpty()) { throw UnknownHostException("${address.dns} returned no addresses for $socketHost") }
eventListener.dnsEnd(call, socketHost, addresses)
for (inetAddress in addresses) { mutableInetSocketAddresses += InetSocketAddress(inetAddress, socketPort) } } }
privatefunconnectTls(connectionSpecSelector: ConnectionSpecSelector) { val address = route.address val sslSocketFactory = address.sslSocketFactory var success = false var sslSocket: SSLSocket? = null try { // 创建用于tls的socket sslSocket = sslSocketFactory!!.createSocket( rawSocket, address.url.host, address.url.port, true/* autoClose */) as SSLSocket
// 配置socket连接 val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket) if (connectionSpec.supportsTlsExtensions) { Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols) }
// tls握手 sslSocket.startHandshake() val sslSocketSession = sslSocket.session val unverifiedHandshake = sslSocketSession.handshake()
// 校验hostname if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) { val peerCertificates = unverifiedHandshake.peerCertificates if (peerCertificates.isNotEmpty()) { val cert = peerCertificates[0] as X509Certificate throw SSLPeerUnverifiedException(......) } else { throw SSLPeerUnverifiedException("Hostname ${address.url.host} not verified (no certificates)") } }
val certificatePinner = address.certificatePinner!!
funinitReaderAndWriter(name: String, streams: Streams) { val extensions = this.extensions!! synchronized(this) { // 参数 this.name = name this.streams = streams // 连接包装类,writer this.writer = WebSocketWriter( isClient = streams.client, sink = streams.sink, random = random, perMessageDeflate = extensions.perMessageDeflate, noContextTakeover = extensions.noContextTakeover(streams.client), minimumDeflateSize = minimumDeflateSize ) // 写task this.writerTask = WriterTask() // 用于发送心跳包的task if (pingIntervalMillis != 0L) { val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis) taskQueue.schedule("$name ping", pingIntervalNanos) { writePingFrame() return@schedule pingIntervalNanos } } if (messageAndCloseQueue.isNotEmpty()) { runWriter() // Send messages that were enqueued before we were connected. } } // 连接包装类,reader reader = WebSocketReader( isClient = streams.client, source = streams.source, frameCallback = this, perMessageDeflate = extensions.perMessageDeflate, noContextTakeover = extensions.noContextTakeover(!streams.client) ) }
读取数据
1 2 3 4 5 6 7
funloopReader() { // 只要没有关闭,死循环读取 while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader!!.processNextFrame() } }
@SynchronizedoverridefunonReadPing(payload: ByteString) { // Don't respond to pings after we've failed or sent the close frame. if (failed || enqueuedClose && messageAndCloseQueue.isEmpty()) return
@Synchronizedprivatefunsend(data: ByteString, formatOpcode: Int): Boolean { // Don't send new frames after we've failed or enqueued a close frame. // 确保没有异常 if (failed || enqueuedClose) returnfalse
// If this frame overflows the buffer, reject it and close the web socket. // 如果待发送数据长度大于指定大小(15m) // 由于write操作是先写入Buffer然后在发送给服务端,为了避免oom限制大小。 if (queueSize + data.size > MAX_QUEUE_SIZE) { close(CLOSE_CLIENT_GOING_AWAY, null) returnfalse }
internalfunwriteOneFrame(): Boolean { val writer: WebSocketWriter? val pong: ByteString? var messageOrClose: Any? = null var receivedCloseCode = -1 var receivedCloseReason: String? = null var streamsToClose: Streams? = null var readerToClose: WebSocketReader? = null var writerToClose: WebSocketWriter? = null
synchronized(this@RealWebSocket) { if (failed) { returnfalse// Failed web socket. }
try { if (pong != null) { writer!!.writePong(pong) } elseif (messageOrClose is Message) { // 发送请求 val message = messageOrClose as Message // 写入message帧 writer!!.writeMessageFrame(message.formatOpcode, message.data) synchronized(this) { queueSize -= message.data.size.toLong() } } elseif (messageOrClose is Close) { val close = messageOrClose as Close writer!!.writeClose(close.code, close.reason)
// We closed the writer: now both reader and writer are closed. if (streamsToClose != null) { listener.onClosed(this, receivedCloseCode, receivedCloseReason!!) } } else { throw AssertionError() }