OkHttp源码分析(二)OkHttpClient、Request、Call、Dispatcher详解

这篇文章主要介绍OkHttpClient、Request、Call、Dispatcher等类

文章基于OkHttp3.14.3版本

0. 说明

上一篇OkHttp源码分析(一)请求和响应过程简单分析中我们简单分析了OkHttp从请求到响应的过程,这篇就来深入学习下其中涉及到的比较关键的类:

  • OkHttpClient: OkHttp客户端参数配置、Call工厂
  • Request: 请求链接、方法、参数配置
  • Call、Realcall: 请求任务
  • Dispatcher: 任务调度器

1. OkHttpClient

OkHttpClient类主要应用了[外观模式]()和[建造者模式]()两种设计模式来设计,结合外观模式的思想,将许多对应OkHttp中各个功能模块的对象包含到类中,并为这些功能对象的配置提供了共同的对外接口。同时使用建造者模式,提供一个Builder类为这些众多的功能模块提供链式的配置方式,使得繁杂的功能模块配置变得简洁。首先,创建一个OkHttpClient.builder对象,接着按需设置builder各个参数:

OkHttpClient.Builder builder = new OkHttpClient.Builder();

1.1 超时、失败重连

OkHttpClient中超时时间参数有以下几种:

  • callTimeout: 设置完整的请求过程超时时间。该参数计算的是整个请求过程的时间:从解析DNS、与Server建立连接、发送请求、Server响应处理到读取请求结果。同时,如果请求中包含重定向和失败重连,这两个过程执行时间也包含在callTimeOut计时时间内。取值范围:0~Integer.MAX_VALUE,其中0是默认值,表示不设置超时时间;
// 设置整个请求过程最大超时时间为60s
builder.callTimeout(Duratino.ofSeconds(60));
  • connectTimeout: 设置连接建立的超时时间。该参数计算的是建立与服务端之间tcp socket连接的时间。取值范围:0~Integer.MAX_VALUE,其中0表示不设置超时时间,默认值为10s
// 设置连接建立超时时间
builder.connectTimeout(Duration.ofSeconds(10));
  • readTimeout: 设置连接的IO读操作超时时间。该参数应用于请求中的TCP socket和各个IO读操作,包括对Source和Response的读操作。其中0表示不设置超时时间,默认值为10s
// 设置读超时时间
builder.readTimeout(Duration.ofSeconds(10));
  • writeTimeout: 设置连接的IO写操作超时时间。该参数应用于请求中的各个IO写操作,其中0表示不设置超时时间,默认值为10s
// 设置写超时时间
builder.writeTimeout(Duration.ofSeconds(10));
  • retryOnConnectionFailure: 是否允许OkHttp自动执行失败重连,默认为true。当设置为true时,okhttp会在以下几种可能的请求失败的情况下恢复连接并重新请求:1.IP地址不可达;2.过久的池化连接;3.代理服务器不可达。
builder.retryOnConnectionFailure(true);

1.2 拦截器

OkHttpClient支持添加多个HTTP/HTTPS请求拦截器和WebSocket拦截器:

  • addInterceptor: 添加自定义的HTTP/HTTPS请求拦截器:
builder.addInterceptor(chain -> chain.proceed(chain.request()));
  • addNetworkInterceptor: 添加自定义的WebSocket请求拦截器,该方法添加的拦截器只在请求为websocket的情况下有效:
builder.addNetworkInterceptor(chain -> chain.proceed(chain.request()));

1.3 缓存和Cookie

OkHttp支持自定义缓存的路径和大小,以及Cookie的缓存处理:

  • cache: 设置缓存的路径和缓存空间大小,用于读取和写入已缓存的响应信息Response
// 设置缓存文件,用于将HTTP/HTTPS响应缓存到文件系统从而达到重用的目的以节省时间和网络带宽
builder.cache(new Cache(new File("cache_path"), 1024 * 1024))
  • cookieJar: 设置Cookie处理器,用于从HTTP响应中接收Cookie,并且可以将Cookie提供给即将发起的请求。该参数默认值为CookieJar.NO_COOKIES,即不处理Cookie。
// 将cookie缓存到内存中
builder.cookieJar(new CookieJar() {
    private final HashMap<String, List<Cookie>> cookieStore = new HashMap<>();
    @Override
    public void saveFromResponse(final HttpUrl url, final List<Cookie> cookies) {
        cookieStore.put(url.host(), cookies);
    }
    @Override
    public List<Cookie> loadForRequest(final HttpUrl url) {
        List<Cookie> cookies = cookieStore.get(url.host());
        return null != cookies ? cookies : new ArrayList<Cookie>();
    }
})

1.4 域名解析、重定向

  • dns: 设置域名解析服务。默认情况下,OkHttp内部使用系统提供的域名解析服务。也可以通过该方法设置自定义的域名解析规则,比如屏蔽某些域名请求、或强制解析到固定的IP下。
// 自定义dns解析,屏蔽百度用域名解析并使用系统提供的DNS解析服务解析其他域名
builder.dns(hostname -> {
    // 屏蔽百度链接
    if (hostname.contains("baidu.com")) {
        List<InetAddress> addresses = new ArrayList<>();
        addresses.add(InetAddress.getByAddress(new byte[]{(byte) 127, (byte) 0, (byte) 0, (byte) 1}));
        return addresses;
    }
    return Dns.SYSTEM.lookup(hostname);
})
  • followRedirects: 设置是否允许请求重定向,默认为true允许;
  • followSslRedirects: 设置是否允许HTTP与HTTPS请求之间互相重定向,默认为true允许。区别于HttpURLConnectiond呃这个选项默认是不允许的。
builder.followRedirects(true)
             .followSslRedirects(true);

1.5 ping心跳机制和协议设置

  • pingInterval: 设置ping信号发送时间间隔,该选项一般用于维持Websocket/Http2长连接,发送心跳包。默认值为0表示禁用心跳机制。
builder.pingInterval(Duration.ofSeconds(59));
  • protocols: 设置OkHttpClient使用的协议,默认为HTTP/2和HTTP/1.1
builder.protocols(Util.immutableList(Protocol.HTTP_2, Protocol.HTTP_1_1));

1.6 配置连接池、请求调度器

  • connectionPool: 手动配置连接池。当前OkHttp版本的连接池默认为容纳最大5个连接数,并在连接空闲超时5分钟后将其从池中移除。
  • connectionPool: 自定义连接池,当前OkHttp版本默认的连接池最大容纳5个连接数,并在连接空闲超过5分钟后将其回收,从连接池中移除。
// 手动配置连接池,其中ConnectionPool第一个参数表示池内容纳的最大连接
builder.connectionPool(new ConnectionPool(5, 5, TimeUnit.MINUTES));
  • dispatcher: 自定义请求任务调度器,多数情况下使用默认的请求调度器即可。当需要手动配置执行请求任务的线程池时可以通过此选项设置实现。
builder.dispatcher(new Dispatcher());
//手动配置执行请求任务的线程池
builder.dispatcher(new Dispatcher(Executors.newFixedThreadPool(64)));

1.7 为请求设置“上帝视角”:事件监听器

OkHttp中的EventListener,对于每次请求,犹如"上帝视角"般的存在:如果为OkHttpClient设置了EventListener,则一个请求从发起到结束的所有步骤都会被EventListener“看”到,请求的完整生命周期事件都会通过EventListener对应的接口回调给上层,因此,在开发debug阶段,或想要了解一个请求需要经历哪些流程时,也可以通过设置EventListener来获取相应信息。

  • eventListenerFactory: 为每一个Call指定单独的事件监听器。
// EventListener.NONE不监听任何事件
builder.eventListenerFactory(call -> EventListener.NONE);
  • eventListener: 为OkHttpClient设置统一的事件监听器,该选项对同一个OkHttpClient实例的所有Call都生效;实际上该选项内部也调用了eventListenerFactory方法,为每一个call设置了相同的事件监听器:
public Builder eventListener(EventListener eventListener) {
  if (eventListener == null) throw new NullPointerException("eventListener == null");
  this.eventListenerFactory = EventListener.factory(eventListener);
  return this;
}

1.8 代理设置

可以通过以下三个选项设置请求代理:

  • proxySelector: 设置代理选择策略
builder.proxySelector(ProxySelector.getDefault());
  • proxyAuthenticator: 设置代理身份验证
builder.proxyAuthenticator(Authenticator.NONE);
  • proxy: 设置使用指定代理。该选项优先级高于proxySelector
builder.proxy(Proxy.NO_PROXY);

1.9 socket相关

  • socketFactory: 设置自定义的用于创建socket连接的socket工厂对象,OkHttp默认使用无参的SocketFactory.createSocket()重载方法来创建未连接状态的socket。可以通过该参数设置实现将创建的socket绑定到特定的地址。
builder.socketFactory(SocketFactory.getDefault());

1.10 HTTPS相关

  • certificatePinner: 设置固定证书
// 设置默认固定证书
builder.certificatePinner(CertificatePinner.DEFAULT);
builder.connectionSpecs(Util.immutableList(ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT));
  • sslSocketFactory: 设置用于创建SSLSocket连接的工厂对象和X509信任管理:
builder.sslSocketFactory((SSLSocketFactory) SSLSocketFactory.getDefault(), Util.platformTrustManager());

最后,生成OkHttpClient对象:

OkHttpClient client = builder.build();

接下来看看Request类。

2. Request

Request 封装了请求的内容,包括链接、请求体参数、请求头等,以及请求tag,比较简单。Request也是通过Builder构建:

// Step 2. 构建一个Request用于封装请求地址、请求类型、参数等信息
Request request = new Request.Builder().get()
                                       .url("https://www.baidu.com")
                                       .build();

创建好OkHttpClientRequest之后,就可以生成请求任务,发起请求了,接下来看Call和其实现类RealCall

3. Call和RealCall

当前版本的OkHttp中,接口Call只有RealCall这一个实现类。Call表示一个已经准备好,可以执行的请求任务,Call执行时可以取消,但一个Call只能被执行一次。Call除了封装分别用于执行同步异步请求的execute()enqueue(callback)两个接口外,还封装了其他几个请求相关的接口:

  • Request request(); 获取发起本次请求任务的请求对象Request;
  • void cancel(); 取消正在执行的本次请求,已经结束的请求不能取消;
  • boolean isExecuted(); 判断请求任务是否已经执行过了,避免同个请求任务调用一次以上;
  • boolean isCanceled(); 判断请求是否已取消;
  • Timeout timeout(); 获取请求任务的超时时间对象,该参数对应于OkHttpClient.Builder.callTimeOut方法设置的超时参数;
  • Call clone(); 克隆本次请求为一个新的请求,如果同一个请求任务需要重复执行,可以通过该方法克隆出一个新的Call来实现。

接下来看看RealCall类源码。

我们在发起Http请求时会通过OkHttpClient和Request对象来创建Call:

// 创建一个新的请求任务Call
Call call = httpClient.newCall(request);

而httpClient.newCall()方法内部通过RealCall的静态方法newCall创建并返回一个RealCall对象,所以在执行同步/异步请求时实际调用的是RealCall中的实现方法:

// OkHttpClient.newCall 
@Override 
public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}

打开RealCall源码可以看到,RealCall内部持有本次请求的Request对象和OkHttpClient对象,同时还发现,RealCall还声明了一个Transmitter类型的对象并随着RealCall的创建而创建。Transmitter作为OkHttp的应用层和网络层的连接,负责对外暴露OkHttp中的高级应用程序层原语,包括连接、请求、响应和流等。结合RealCall源码可以发现,RealCall负责的是请求发起和执行,Transmitter则负责请求任务的状态、超时时间、生命周期事件的更新以及请求任务背后的连接、连接池的维护管理等。

/*RealCall.java:通过transmitter控制超时时间的计算、生命周期步骤更新、取消请求等*/
@Override public Response execute() throws IOException {
  // ... 忽略其他代码
  transmitter.timeoutEnter();
  transmitter.callStart();
  // ... 忽略其他代码
}

@Override public void enqueue(Callback responseCallback) {
 // ... 忽略其他代码
 transmitter.callStart();
 // ... 忽略其他代码
}

@Override public void cancel() {
  transmitter.cancel();
}

@Override public Timeout timeout() {
  return transmitter.timeout();
}

再看看clone()方法实现:

@SuppressWarnings("CloneDoesntCallSuperClone") // We are a final type & this saves clearing state.
@Override public RealCall clone() {
  return RealCall.newRealCall(client, originalRequest, forWebSocket);
}

可以看到,clone内部创建了一个新的Call,相当于调用了(RealCall)client.newCall(request),因此可以用这个克隆的Call继续发起请求。

RealCall还封装了个内部类AsyncCall用于执行异步请求,AsyncCall声明了CallBack变量用于回调通知异步请求结果,以及一个线程安全的AtomicInteger类型变量callsPerHost用于计量同一主机的请求数。通过上一篇的分析知道,AsyncCall是一个Runnable并且最终通过AsyncCall.execute()方法执行网络请求。那RealCall内部是怎样执行到这个这个方法的呢?我们之前是通过快速跳转实现的方式找到了这个方法的,而AsyncCall封装的异步请求任务是在RealCall.enqueue执行时被添加到Dispatch中的请求队列:

// RealCall.enqueue()
@Override public void enqueue(Callback responseCallback) {
 // ... 忽略无关代码
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

那么只要搞清楚Dispatcher.enqueue()背后的队列中的任务何时何地执行的,上面的问题就有答案了。

4. Dispatcher

Dispatcher是OkHttp中的请求任务调度器,内部维护了一个线程池和相关的请求队列用于实现高并发的异步请求:

public final class Dispatcher {
  // 最大并发请求数,默认为64个
  private int maxRequests = 64;
  // 相同服务器主机的最大并发请求数,默认为5个
  private int maxRequestsPerHost = 5;
  // 空闲回调,如果设置,则当该调度器空闲时(正在执行的任务数变为0)时回调通知idleCallback.run()
  private @Nullable Runnable idleCallback;
  // 执行异步任务的线程池
  private @Nullable ExecutorService executorService;
  // 存放已经准备就绪可以执行,但尚未执行的异步请求任务的双向队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 存放正在执行的异步请求任务,包括已经取消但未完全结束的请求的双向队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 存放正在执行的同步请求任务,包括已经取消但未完全结束的请求的双向队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  //...其他代码
}

其中线程池对象通过懒加载方式创建:

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                                             new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

可以看到,executorService实际创建的是一个无边界、核心线程数为0的线程池。其中池内线程空闲等待时长为60s,超过空闲时间自动结束;工作队列workQueue为SynchronousQueue类型的队列,该队列是一个同步队列,保证并发的任务顺序执行,且该类型队列内部不存储值,只作传递,由于executorService创建的线程数无限制,不会有队列等待,所以使用SynchronousQueue(参考Executors.newCachedThread()创建缓存线程池);

参考https://www.jianshu.com/p/074dff0f4ecb

SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。

在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。

接着第3节RealCall分析最后的问题,来看看Dispatcher.enqueue()方法的实现:

// Dispatcher.enqueue()
void enqueue(AsyncCall call) {
  synchronized (this) {
    // 将新的异步请求任务添加到readyAsyncCalls队列
    readyAsyncCalls.add(call);
        // 修改call,使其共享runningAsyncCalls或readyAsyncCalls中现有的请求相同主机的call.callsPerHost变量
    if (!call.get().forWebSocket) {
      AsyncCall existingCall = findExistingCallWithHost(call.host());
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
    }
  }
  // 执行AsyncCall请求调度策略,发起call请求
  promoteAndExecute();
}

对于新请求入列的异步请求任务call,首先将其添加到readyAsyncCalls队列,以表示这个call准备就绪,可以执行请求;接着修改这个call的callsPerHost属性为与先前添加的相同主机请求任务的call共享,从而实现对相同主机的请求计数,对相同主机的最大并发请求数进行限制。接着调用promoteAndExecute()方法,将readyAsyncCalls队列中的任务提升到runningAsyncCalls,并执行请求:

private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));
  // 创建可执行任务列表,用于筛选出readyAsyncCalls中可执行的任务
  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    // 遍历筛选readyAsyncCalls
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();
            // 如果runningAsyncCalls中的请求任务数超过最大并发请求数限制maxRequests则任务继续放在readyAsyncCalls中等待执行
      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      // 如果与asyncCall相同主机的请求数超过最大并发同主机请求数则,则不执行该请求
      if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
            // 从readyAsyncCalls中移除符合条件的请求任务
      i.remove();
      // 将asyncCall关联的主机请求数增1
      asyncCall.callsPerHost().incrementAndGet();
      // 加入可执行请求任务列表
      executableCalls.add(asyncCall);
      // 加入runningAsyncCalls任务队列
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }
    // 遍历执行请求任务
  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}

可以看到,Dispatcher请求调度器最终是在promoteAndExecute()方法中实现最大并发请求数量和最大并发同主机请求数量限制的。在方法的最后遍历执行请求任务,调用了每一个AsyncCall的executeOn()方法并将当前Dispatcher的线程池作为参数传入,在看看这个AsyncCall.executeOn(executorService)方法的实现:

// RealCall.AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    // 通过executorService线程池执行请求任务
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    // 发生异常时关闭连接
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);
  } finally {
    // 最后如果请求任务执行失败则结束任务,从runningAsyncCalls中将任务移除
    if (!success) {
      client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}

AsyncCall.executeOn()方法内部调用线程池的execute方法执行本次任务,由此触发AsyncCall父类的run方法,并执行到AsyncCall的execute()方法,完成了本次请求!因此,AsyncCall.execute()调用过程大致如下:

AsyncCall.execute调用过程

最后,调用Dispatcher.finished(call)方法结束本次请求:

@Override protected void execute() {
  // ... 忽略无关代码
  try {
    // ... 忽略无关代码
  } catch (IOException e) {
    // ... 忽略无关代码
  } finally {
    // 结束本次请求
    client.dispatcher().finished(this);
  }
}
// Dispatcher.finished(call)
void finished(AsyncCall call) {
  // 将call同主机并发请求数减1
  call.callsPerHost().decrementAndGet();
  // 结束本次请求任务,主动将call从runningAsyncCalls队列移除
  finished(runningAsyncCalls, call);
}

以上从Dispatcher.enqueue()开始到Dispatcher.finished(call)结束就是Dispatcher调度异步请求的过程。Dispatcher对同步请求的调度执行就简单多了,单线程任务,直接从RealCall.execute跟进分析即可。不管是同步还是异步请求,最终都是通过RealCall的getResponseWithInterceptorChain()方法完成请求和获取响应结果的,那getResponseWithInterceptorChain()方法内部是如何通过拦截器链完成请求的呢?下一篇就来分析分析OkHttp中的拦截器Interceptor。

5. The End :)

Last modification:September 22nd, 2019 at 08:57 pm
If you think my article is useful to you, please feel free to appreciate

Comment here is closed