OkHttp源码分析:分发器任务调配,拦截器责任链设计,连接池socket复用

news2025/1/24 11:39:18

目录

一,分发器和拦截器

二,分发器处理异步请求

1.分发器处理入口

2.分发器工作流程

3.分发器中的线程池设计

三,分发器处理同步请求

四,拦截器处理请求

1.责任链设计模式

 2.拦截器工作原理

3.OkHttp五大拦截器


一,分发器和拦截器

        OkHttp在内部维护了这几个重要对象:分发器dispatcher,连接池connectionPool,拦截器Interceptor;

//拦截器
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

//连接池
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
  
//拦截器
@get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()

@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()

他们的作用分别为:

  • 分发器Dispatcher:调配请求任务,内部维护队列线程池  
  • 拦截器:处理请求与响应,完成请求过程
  • 连接池:管理socket连接与连接复用

        从OkHttp的请求处理流程来看: 拦截器负责完成网络请求过程,同步和异步请求必须经过分发器调配后才会发给拦截器进行网络请求;

二,分发器处理异步请求

1.分发器处理入口

private void visitInternet() {
    //1.创建HttpClient对象
    OkHttpClient okHttpClient = new OkHttpClient();
    //2.获取request对象
    Request.Builder builder = new Request.Builder()
            .url("https://www.bilibili.com/");
    Request request = builder.build();
    //3.获取call对象
    Call call = okHttpClient.newCall(request);
    //4.执行网络操作
    try {
        Response response = call.execute();
        String result = response.body().string();
        showResultOnUiThread(result);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

        从OkHttp处理流程来看,每次发送请求前我们需要调用 newCall() 方法获取call对象,这里的Call是一个接口,newCall返回的是Call接口的实现类RealCall;

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

call对象只能使用一次 

        发起异步请求需要调用call对象的 enqueue() 方法,enqueue方法首先会将call对象中的executed字段置为true,代表这个call对象已经使用过,第二次就无法使用,想要再次使用的话需要调用call对象的 clone() 方法;

        callStart方法执行后表示请求开始,之后便会执行分发器的enqueue方法处理异步请求,这里传入的对象AsyncCall是Runnable接口的实现类,可以理解为是我们要处理的异步任务;

  override fun enqueue(responseCallback: Callback) {
    //call对象只能使用一次
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart() //请求开始
    
    //分发器处理异步请求
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

2.分发器工作流程

分发器中维护了三个队列:

  • readyAsyncCalls:等待中异步请求队列
  • runningAsyncCalls:执行中异步请求队列
  • runningSyncCalls:执行中同步请求队列

        分发器dispatcher的enqueue方法执行后,异步请求AsyncCall默认先放到readAsyncCalls中,如果是非websocket连接,则检查一下runningAsyncCalls和readAsyncCalls中是否有相同域名host的请求,如果有则复用之前的域名的计数器existingCall

        计数器之后用于判断同一主机(域名)请求连接数

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

检查完之后调用promoteAndExecute()方法,在这个方法中会检查两件事:

  • 进行中异步请求数是否 ≥ 64(runningAsyncCalls队列的size是否 ≥ 64),
  • 对同一域名(主机)的请求callsPerHost是否大于5;

若条件符合,将异步任务加入到runningAsyncCalls中

检查完可执行请求并更新状态后,将请求提交到线程池中执行

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

          //检查可执行请求
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    //提交到线程池中执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService //线程池)
    }

    return isRunning
  }

将AsyncCall提交到线程池后,AsyncCall对象的run方法便会被执行;

在run方法中,从拦截器中获取了服务器的响应,完成请求后调用dispatcher的finish方法,结束本次异步请求;

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
            //拦截器完成请求,返回响应
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
            //调用finish方法,结束本次异步请求
          client.dispatcher.finished(this)
        }
      }
    }

在完成一次请求后,runningAsyncCalls队列会空出位置

所以在finish方法中,会重新调用检查异步任务方法promoteAndExecute(),也就是在结束一次请求后,会去检查readyAsyncCalls队列中符合条件的异步任务,并去执行他们

idleCallback.run() 用于在所有请求完成后执行特定操作,操作内容自定义

internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    //重新调用promoteAndExecute,检查可执行异步请求
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
        //用于在所有请求完成后执行特定操作,操作内容自定义
      idleCallback.run()
    }
  }

3.分发器中的线程池设计

分发器中的线程池:

  • 核心线程数:0
  • 最大线程数:Int.MAX_VALUE
  • 空闲时间:60s
  • 工作队列:SynchronousQueue()
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(
            0, //核心线程数
            Int.MAX_VALUE, //最大线程数
            60, //空闲时间
            TimeUnit.SECONDS, //空闲时间单位(秒)
            SynchronousQueue(), //工作队列
            threadFactory("$okHttpName Dispatcher", false)
        )
      }
      return executorServiceOrNull!!
    }

线程池工作原理:

  1. 工作中线程 < 核心线程数 创建新线程
  2. 工作中线程 > 核心线程数且工作队列未满,加入工作队列
  3. 工作队列已满,工作中线程数若 < 最大线程数, 创建新线程
  4. 工作队列已满,工作中线程数 > 最大线程数, 执行拒绝策略(默认为抛出异常,可自定义)

在okhttp的分发器中,线程池使用SynchronousQueue()作为工作队列,这种容器没有容量,也就无法添加任务,所以当工作中线程 > 核心线程数,会直接创建新线程

三,分发器处理同步请求

对于同步请求,分发器只记录请求(放入RunningSyncCalls中)

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

//dispatcher.executed()
  @Synchronized internal fun executed(call: RealCall) {
       //分发器只记录同步请求
    runningSyncCalls.add(call)
  }

四,拦截器处理请求

1.责任链设计模式

OkHttp中的拦截器采用责任链设计模式:

        为避免请求发送者与多个请求处理者耦合在一起,于是将所有请求处理者通过前一对象记住下一对象的引用而形成一条链,当有请求发生时,请求只需沿着链传递,直到有对象处理它

模拟责任链设计模式:

我们定义一个Handler抽象类,并让他持有下一Handler对象的引用next,并创建Handler三个子类

abstract class Handler {

    protected var next : Handler? = null;

    fun setNext(next : Handler){
        this.next = next;
    }

    fun getNext() : Handler?{
        return next;
    }

    abstract fun handle(request : String);
}

class Handler1 : Handler() {

    override fun handle(request: String) {
        if("1".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler2 : Handler() {
    override fun handle(request: String) {
        if("2".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler3 : Handler() {
    override fun handle(request: String) {
        if("3".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

        我们让handler1拥有2的引用,2拥有3的引用,这样当我们调用1的handle("3")时,request对象就会一直沿着责任链执行,直到遇到能处理他的对象(handler3)

val handler1: Handler = Handler1()
val handler2: Handler = Handler2()
val handler3: Handler = Handler3()

handler1.setNext(handler2)
handler2.setNext(handler3)

handler1.handle("3")

 2.拦截器工作原理

拦截器的工作基本分为三步:

  1. 处理请求request
  2. 将请求传往下一拦截器,获取返回的请求response
  3. 处理响应response并返回

例如,我们自定义一个日志打印拦截器:

class LogInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        //1.处理请求
        val request = chain.request();

        val requestLog = StringBuilder().apply {
            append("Request:\n")
            append("URL: ${request.url}\n")
            append("Method: ${request.method}\n")
            append("Headers: ${request.headers}\n")
            request.body?.let {
                append("Body: ${it.toString()}\n")
            }
        }
        Log.d("OkHttp", requestLog.toString())

        //将请求传往下一拦截器,获取响应
        val response = chain.proceed(request)

        //处理响应并返回
        val responseLog = StringBuilder().apply {
            append("Response:\n")
            append("Code: ${response.code}\n")
            append("Headers: ${response.headers}\n")
            response.body?.let {
                append("Body: ${it.string()}\n")
            }
        }
        Log.d("OkHttp", responseLog.toString())

        return response;
    }
}

在chain的proceed方法中,程序会找到拦截器链中的下一拦截器并将请求传给他,获取返回的请求

  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // 找到拦截器链中的下一拦截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    //传递请求,获取响应
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

3.OkHttp五大拦截器

OkHttp中默认配置五个拦截器,分别为:

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
    interceptors += client.networkInterceptors
}
nterceptors += CallServerInterceptor(forWebSocket)
  • 重试和重定向拦截器 RetryAndFollowUpInterceptor:重试拦截器在交出前(交给下一个拦截器),负责判断用户是否取消了请求。在获得了响应之后,会根据响应码判断是否需要重定向,如果满足所有条件就会重启执行所有拦截器
  • 桥接拦截器(处理请求头和响应头)BridgeInterceptor:在交出之前,负责将Http协议必备的请求头加入请求之中(如Host,Connection),并添加一些默认的行为(如RZIP压缩);获得响应后调用保存cookie接口并解析GZIP数据
  • 缓存拦截器 CacheInterceptor:交出之前读取并判断是否使用缓存;获取响应后判断是否缓存
  • 连接拦截器 ConnectInterceptor:交出之前,负责创建或找到一个连接,并获取socket流;获取响应后不进行额外处理
  • 网络请求拦截器(执行实际的网络请求)CallServerInterceptor:进行真正的与服务器通信,向服务器发送数据,解析读取的响应数据

OkHttp中添加拦截器有两种方式:addInterceptor()和 addNetworkInterceptor(),他们的主要区别如下:

  • 调用时机:Application拦截器在请求开始时调用,Network在网络连接建立后调用
  • 调用次数:Application只调用一次,Network可能调用多次(重定向)
  • 可见信息:Application只能看到最终请求/响应,Network能看到所有中间请求/响应
  • 缓存感知:Application无法感知缓存,Network可以感知缓存
  • 使用场景:Application一般用于业务处理(如:身份验证,日志记录,错误处理),Network一般用于网络层操作(如:网络监控,缓存处理,压缩处理)

OkHttp完整拦截器链如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2262257.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP:如何修改已释放的请求

SAP:如何修改已释放的请求 QQ出了一个新功能&#xff0c;把10年前的旧日志推给自己。这个10年前的日志&#xff0c;是用户反映在SE10中把请求释放后发现漏了内容&#xff0c;想修改已释放的请求。经调查写了一个小程序&#xff0c;实现用户的需求。 *&-------------------…

python怎么循环嵌套

嵌套循环&#xff1a; 概念&#xff1a;循环中再定义循环&#xff0c;称为嵌套循环&#xff1b; 【注意】嵌套循环可能有多层&#xff0c;但是一般我们实际开发最多两层就可以搞定了(99%的情况) 格式&#xff1a; 1、while中套while常用 2、while中套for in 3、for in中套…

前端优雅(装逼)写法(updating····)

1.>>右位移运算符取整数 它将一个数字的二进制位向右移动指定的位数&#xff0c;并在左侧填充符号位&#xff08;即负数用1填充&#xff0c;正数用0填充&#xff09;。 比如 2.99934 >> 0&#xff1a;取整结果是2&#xff0c;此处取整并非四舍五入 2.99934 会先…

MySQL -- 库的相关操作

目录 查看数据库 创建数据库 直接创建&#xff1a; 加约束条件 if not exists 字符集和校对规则 什么是字符集 什么是校对规则 校对规则的主要功能 校对规则的特性 查看指定的数据库使用的字符集和校对规则&#xff1a; 比较是否区分大小写字母差异 显示创建语句 …

Moretl开箱即用日志采集

永久免费: 至Gitee下载 使用教程: Moretl使用说明 使用咨询: 用途 定时全量或增量采集工控机,电脑文件或日志. 优势 开箱即用: 解压直接运行.不需额外下载.管理设备: 后台统一管理客户端.无人值守: 客户端自启动,自更新.稳定安全: 架构简单,兼容性好,通过授权控制访问. 架…

分享一次接口性能摸底测试过程

接口性能测试是用于验证应用程序中的接口是否可以满足系统的性能要求的一种测试方法。确定应用程序在各种负载条件下的性能指标&#xff0c;例如响应时间、吞吐量、并发性能等&#xff0c;以便提高系统的性能和可靠性。本文主要讲述接口性能测试从前期准备、方案设计到环境搭建…

【机器学习】机器学习的基本分类-无监督学习-t-SNE(t-分布随机邻域嵌入)

t-SNE&#xff08;t-分布随机邻域嵌入&#xff09; t-SNE&#xff08;t-distributed Stochastic Neighbor Embedding&#xff09;是一种用于降维的非线性技术&#xff0c;常用于高维数据的可视化。它特别适合展示高维数据在二维或三维空间中的分布结构&#xff0c;同时能够很好…

【教学类-83-03】20241218立体书盘旋蛇3.0——圆点蛇1(蚊香形)

背景需求&#xff1a; 制作儿童简易立体书贺卡 【教学类-83-01】20241215立体书三角嘴1.0——小鸡&#xff08;正菱形嘴&#xff09;-CSDN博客文章浏览阅读1k次&#xff0c;点赞24次&#xff0c;收藏18次。【教学类-83-01】20241215立体书三角嘴1.0——小鸡&#xff08;正菱形…

监控视频汇聚融合云平台一站式解决视频资源管理痛点

随着5G技术的广泛应用&#xff0c;各领域都在通信技术加持下通过海量终端设备收集了大量视频、图像等物联网数据&#xff0c;并通过人工智能、大数据、视频监控等技术方式来让我们的世界更安全、更高效。然而&#xff0c;随着数字化建设和生产经营管理活动的长期开展&#xff0…

JAVA 零拷贝技术和主流中间件零拷贝技术应用

目录 介绍Java代码里面有哪些零拷贝技术java 中文件读写方式主要分为什么是FileChannelmmap实现sendfile实现 文件IO实战需求代码编写实战IOTest.java 文件上传阿里云&#xff0c;测试运行代码看耗时为啥带buffer的IO比普通IO性能高&#xff1f;BufferedInputStream为啥性能高点…

云灾备技术

目录 云灾备分类与定义 云容灾定义与主要应用场景 云容灾定义 应用场景 云备份定义与主要应用场景 云备份定义 应用场景 云容灾参考模型与关键技术 云备份参考模型与关键技术 云灾备分类与定义 云容灾技术是指保护云数据中心业务持续性的灾备技术&#xff0c;它是云灾…

进程通信方式---共享映射区(无血缘关系用的)

5.共享映射区&#xff08;无血缘关系用的&#xff09; 文章目录 5.共享映射区&#xff08;无血缘关系用的&#xff09;1.概述2.mmap&&munmap函数3.mmap注意事项4.mmap实现进程通信父子进程练习 无血缘关系 5.mmap匿名映射区 1.概述 原理&#xff1a;共享映射区是将文件…

leetcode 面试经典 150 题:长度最小的子数组

链接长度最小的子数组题序号209题型数组解题方法滑动窗口难度中等 题目 给定一个含有 n 个正整数的数组和一个正整数 target 。找出该数组中满足其总和大于等于 target 的长度最小的 子数组 [numsl, numsl1, …, numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件…

代码随想录day22 | 回溯算法理论基础 leetcode 77.组合 77.组合 加剪枝操作 216.组合总和III 17.电话号码的字母组合

DAY22 回溯算法开始 学到目前最烧脑的一天 回溯算法理论基础 任何回溯算法都可以抽象成一个树结构 理论基础 什么是回溯法 回溯法也可以叫做回溯搜索法&#xff0c;它是一种搜索的方式。 在二叉树系列中&#xff0c;我们已经不止一次&#xff0c;提到了回溯 回溯是递归的副…

画一颗随机数

代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>codePen - Random Tree</title> </head> <body><canvas></canvas><script>const canvas doc…

牛客周赛 Round 72 题解

本次牛客最后一个线段树之前我也没碰到过&#xff0c;等后续复习到线段树再把那个题当例题发出来 小红的01串&#xff08;一&#xff09; 思路&#xff1a;正常模拟&#xff0c;从前往后遍历一遍去统计即可 #include<bits/stdc.h> using namespace std; #define int lo…

[x86 ubuntu22.04]投影模式选择“只使用外部”,外部edp屏幕无背光

1 问题描述 CPU&#xff1a;G6900E OS&#xff1a;ubuntu22.04 Kernel&#xff1a;6.8.0-49-generic 系统下有两个一样的 edp 屏幕&#xff0c;投影模式选择“只使用外部”&#xff0c;内部 edp 屏幕灭&#xff0c;外部 edp 屏幕无背光。DP-1 是外部 edp 屏幕&#xff0c;eDP-1…

清理C盘小记

突然C盘就爆满了&#xff0c;想当初还是给他预留了120G的空间&#xff0c;感觉到现在也不够用了&#xff0c;担心出现死机的情况就赶紧进行了清理。有一说一&#xff0c;清理回收站是真的有用。 参考&#xff1a;C盘清理指南&#xff0c;清理出30G起&#xff0c;超详细总结&am…

Docker:Docker Compose(补充三)

Docker&#xff1a;Docker Compose 1. Docker Compose 批量管理容器的工具 1. Docker Compose 批量管理容器的工具 Docker Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。通过一个 YAML 文件来配置应用服务&#xff0c;它允许用户编排、组合和配置多个容器的部署…