OKhttp网络框架基础学习http三次握手底层实现

tcp的三次握手及四次挥手相信很多人都比较熟悉,而在Okhttp中,也有对于连接的管理部分。本文主要分析的是Okhttp是如何管理TCP三次握手四次挥手的,或者说Okhttp的连接管理。

本文基于okhttp 3.14.9

gradle依赖:implementation group: ‘com.squareup.okhttp3’, name: ‘okhttp’, version: ‘3.14.9’

Okhttp的拦截器链(责任链)

Okhttp最为经典的就是以责任链模式设计的拦截器链。Okhttp内部是有线程池触发每个RealCall对象继而触发网络请求的,如果您是异步调用的网络请求RealCall.enqueue(),在AsyncCall.execute()方法中就会调用个以下方法。即组装责任链链路interceptors集合,并发起网络请求。值得一题的是在整个过程中都是通过chain.proceed(originalRequest)的形式将责任链遍历。而本文的主角ConnectInterceptor就由此诞生,该interceptor就是负责http连接的

// RealCall.java 210行
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
 
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
 
    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

Okhttp的连接管理

ConnectInterceptor

// ConnectInterceptor.java
/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;
 
  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }
 
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();
 
    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
 
    return realChain.proceed(request, transmitter, exchange);
  }
}

通过源码可知,ConnectInterceptor的代码比较简单,简单可理解为:新建一个Exchange对象,然后往下传递,这时客户端与服务器的连接已经建立,接下来将会是真正的数据传递。所以这里的重点就在transmitter.newExchange(chain, doExtensiveHealthChecks)里面了。

简单介绍一下Transmitter和Exchange

  • Transmitter:OkHttp的应用层和网络层之间的桥梁。里面包含着OkHttpClient对象、RealConnectionPool连接池、该次请求的Call对象、事件监听的EventListener对象、AsyncTimeout对象用于管理超时。Transmitter对象可以理解为一个管理类,关联着整个网络请求的应用层网络层的对接
  • Exchange:管理网络请求的requestresponse,可以理解为网络层,Transmitter中也包含这此对象。实际上,真正处理网络请求的request和response的是其里面的ExchangeCodec对象

从结果来看,一个Exchange对象就代表着一个与服务器的连接。带着这个结论我们继续往下看。

Transmitter.newExchange

// Transmitter.java 158行
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }
 
    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
 
    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
 }

newExchange方法的核心就是调用exchangeFinder.find()方法找到一个合适的连接,即找到一个合适的ExchangeCodec对象继而生成一个Exchange对象返回

ExchangeFinder.find

  // ExchangeFinder.java 79行
  public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();
 
    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

重点关注的是 RealConnection resultConnection = findHealthyConnection();,这里可以理解为通过findHealthyConnection()获取可用的RealConnection对象,再借助RealConnection新建一个ExchangeCodec对象用于网络请求的交互。

RealConnection 是一次连接的抽象。

Exchange or ExchangeCodec 为负责http请求的request和reponse的交互。

所以,要想了解Okhttp如何管理连接的,就需要看RealConnection的查找过程

查找RealConnection

在Okhttp中是有利用连接池复用连接的设计的。RealConnectionPool就是担任着这一角色,其内部维护了一个双端队列Deque connections用来缓存可复用的连接。通过maxIdleConnections最大空闲连接数以及keepAliveDurationNs连接保活时长这俩参数来控制连接的释放。RealConnectionPool内部维护一个线程池,定期释放无用的RealConnectionPool连接。 回到查找可用RealConnection的逻辑当中,之前讲到的findHealthyConnection方法只是对查找过程findConnection的一次封装,它会死循环,直到获取到可用的RealConnection为止

/**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);
 
      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }
 
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges();
        continue;
      }
 
      return candidate;
    }
  }

然后就正式进入查找阶段了,findConnection方法:

  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    // 查找RealConnection结果
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.
 
      。。。
      
      // [1] transmitter对象本身记录的RealConnection可复用
      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection;
        releasedConnection = null;
      }
      
      if (result == null) {
        // Attempt to get a connection from the pool.
        // [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    
    。。。
    
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }
 
    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }
 
    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
 
      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection.getAll();
        // [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
 
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }
 
        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        // [4] 若前面[1][2][3]都未找到时会新建一个RealConnection对象,后续请求服务器连接。
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }
 
    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }
 
    // Do TCP + TLS handshakes. This is a blocking operation.
    // [4] 前面[1][2][3]都未找到时,当前的result为全新RealConnection对象,
    // 这里的connect方法就会进行TCP的3次握手,和SSL/TLS。
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());
 
    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;
 
        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute;
      } else {
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);
 
    eventListener.connectionAcquired(call, result);
    return result;
  }

findConnection方法较长,可以寻找注释中的[1]、[2]、[3]、[4],即为每种获取到可用RealConnection对象情况。

  • [1] transmitter对象本身记录的RealConnection可复用。
  • [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection。
  • [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection。ps:这里与[2]不同的是,会遍历到其他的请求地址,判断的依据可以是相同的host等。
  • [4] 如果前3步都找不到时,就会新建一个连接进行TCP+TLS了。

值得一提的是,每次查找连接池中可复用的连接都会调用connectionPool.transmitterAcquirePooledConnection()方法(只是传的参数不同),在此方法内部是会将查找到的RealConnection对象备份一个引用到transmitter内。而如果是新建连接的话也会通过transmitter.acquireConnectionNoEvents(result)将新的RealConnection对象备份。这样做的目的是为了重试时更快的找到可复用的连接,也对应会上述的[1]中查找情况

执行三次握手

根据上述findConnection方法中的result.connect,跟踪代码会看到,最终走到RealConnection.connectSocket()

// RealConnection.java
private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
 
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);
 
    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
 
    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

到了这一步,RealConnection内部就会新建一个Socket对象,用于后续的tcp连接通信。

Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);就是真正执行三次握手的地方,这里的Platform.get()是Okhttp根据平台(ps:可能不是Android平台或者Android版本)的适配逻辑。

// Platform.java
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
      throws IOException {
    socket.connect(address, connectTimeout);
  }

显然,Okhttp内部的tcp三次握手,依赖的是传统的socket连接

ps:上述代码中会利用socket对象创建一个source和一个sink对象,这个是Okio的东西,如果请求是Http1协议的话就会用到。后续request和reponse就是利用它们来实现。

最后还需要补充的是,上述代码中在找到可用的RealConnection对象后,会利用它新建一个ExchangeCodec对象:resultConnection.newCodec(client, chain);。其实ExchangeCodec是一个接口,会根据所用协议为Http1或者Http2实例出对应的策略类。

// RealConnection.java
ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if (http2Connection != null) {
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink);
    }
  }

这里如果是Http1的话,上述利用socket对象创建sourcesink对象就会被引用到Http1ExchangeCodec当中。

Okhttp的连接释放

说完了三次握手的管理,再来看看四次挥手,即连接释放。在源码中会看到多处地方调用一个closeQuietly方法,该方法是用于释放连接的。

// Utils.java
public static void closeQuietly(Socket socket) {
    if (socket != null) {
      try {
        socket.close();
      } catch (AssertionError e) {
        if (!isAndroidGetsocknameError(e)) throw e;
      } catch (RuntimeException rethrown) {
        throw rethrown;
      } catch (Exception ignored) {
      }
    }
  }

socket.close();就是针对这一次tcp连接的关闭(释放)。这里其实就是真正的连接释放了。而至于常规触发这一逻辑的地方就是前面提到的RealConnectionPool定时回收机制

本文主要介绍的是Okhttp如何管理tcp的连接,三次握手和四次挥手发生的地方。有关OKhttp的更多深入学习,大家可以参考由网易大佬整理《OKhttp学习手册》帮助大家共同进步;才能更好助力Android开发的明天!

最后

最后在总结一下文中设计的类的职能吧!

  • ConnectInterceptor:Okhttp用于与服务器建立连接的拦截器,连接发起的地方。
  • Transmitter:Okhttp应用层与网络层的中间层。这里利用其来获取到Exchange对象。
  • ExchangeFinder:用于查找可用连接。查找可用RealConnection对象,实例出ExchangeCodec对象,以及返回Exchange对象都会在这里完成。
  • Exchange:封装http的request和response的事件触发,中间还会涉及事件监听的回调。内部拥有该次连接的ExchangeCodec对象。
  • ExchangeCodec:真正用于http的request和response的事件。内部使用的是Okio。
  • RealConnection:一次连接的抽象,内部用于本次tcp连接的socket对象。可在Transmitter中、RealConnectionPool连接池或者新建获取,此步骤发生在ExchangeFinder中。
  • RealConnectionPool:用于缓存可复用的RealConnection对象。

本文转载于

作者:Cy13er

原文地址:https://juejin.cn/post/6959882254076084237

相关文章

学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习...
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面...
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生...
Can’t connect to local MySQL server through socket \'/v...
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 ...
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服...