Dubbo-聊聊Dubbo协议

news2025/1/10 17:22:41

前言

Dubbo源码阅读分享系列文章,欢迎大家关注点赞

SPI实现部分

  1. Dubbo-SPI机制

  2. Dubbo-Adaptive实现原理

  3. Dubbo-Activate实现原理

  4. Dubbo SPI-Wrapper

注册中心

  1. Dubbo-聊聊注册中心的设计

  2. Dubbo-时间轮设计

通信

  1. Dubbo-聊聊通信模块设计

什么是协议

在网络交互中是以字节流的形式传递的,对于字节流都是二进制格式,这样我们就面临一个问题就是如何转化为我们可以识别的字符,协议就是来解决这个问题的,协议用通俗易懂地解释就是通信双方需要遵循的约定。 在日常开发中,我们常见的网络传输协议有TCP、UDP、HTTP。常用的中间件也会定义对应的协议,如Redis、Mysql、Zookeeper等都有自己约定的协议,同样Dubbo的通信也采用一种协议,这些都是应用层协议,都是基于TCP或者UDP设计的。

如何定义协议

应用层协议一般的形式有三种:定长协议、特殊结束符和变长协议,聊到这里就可以抛出来一个常见的面试题,如何解决网络通信粘包和拆包的问题?该问题的解决方案也就是通过约定协议,下面我们就来聊聊这三种模式优缺点以及使用场景。

定长协议

定长的协议是指协议内容的长度是固定的,比如协议byte长度是50,当从网络上读取50个byte后,就进行decode解码操作。

优点

定长协议在读取或者写入时,效率比较高,因为数据大小都是确定的。

缺点

定长协议的缺点在于适应性不足,网络传输中传输的内容的大小不可能都是相同的,因此对于一些长度不够的消息,明显过于的浪费带宽。

特殊结束符

特殊结束符就是在每次传输结束的时候使用一个特殊的结束符,在Redis中的协议采用了特殊结束符,客户端和服务器发送的命令一律使用\r\n(CRLF)结尾。

优点

与定长协议一样读取或者写入时,效率比较高,同时解决定长协议的尴尬。

缺点

特殊结束符方式的问题是必须要有一个完整的消息体才能进行传输,除此之外必须要防止用户传输的数据不能同结束符相同,否则就会出现紊乱。

变长协议

变长协议由定长以及不定长两部分组成,定长部分一般是协议头,此部分会包含变长部分的描述,变长协议我们经常使用的HTTP协议采用变长协议,HTTP请求报文格式是由三部分组成:

  1. 请求行:包括Url、Version等,由空格分隔,\r\n结尾;

  2. 请求头:多行,每行是key:value的格式,以\r\n结尾;

  3. 请求体:请求头与请求体直接由一个空白行分隔,请求体的长度在请求头中由content-length给出;

优点

灵活性比较高,解决了定长协议以及特殊结束符的所有缺点。

缺点

复杂性比较高,需要自定义一套标准,所有消息都需要按照该格式发送以及解析。

Dubbo协议

Dubbo框架支持很多协议,默认采用Dubbo协议,Dubbo协议采用的是变长协议的设计,整体的格式如下:

  1. 0~7位和8~15位分别是Magic High和Magic Low,是固定魔数值(0xdabb),我们可以通过这两个Byte,判断是否为Dubbo协议;

  2. 16位是Req/Res标识,用于标识当前消息是请求还是响应;

  3. 17位是2Way标识,用于标识当前消息是单向还是双向,如果需要来自服务器的返回值,则设置为1;

  4. 18位是Event标识,用于标识当前消息是否为事件消息;

  5. 19~23位是序列化类型的标志,用于标识当前消息使用哪一种序列化算法;

  6. 24~31位是Status状态,用于记录响应的状态,当Req/Res为0时才有用;

  7. 32~95位是Request ID,用于记录请求的唯一标识;

  8. 96~127位是序列化后的内容长度,该值是按字节计算;

  9. 128位之后是可变的数据,被特定的序列化类型序列化后,每个部分都是一个 byte [] 或者byte,如果是请求包,则每个部分依次为:Dubbo version、Service name、Service version、Method name、Method parameter types、Method arguments 和 Attachments。如果是响应包,则每个部分依次为:返回值类型、返回值;

image.png

优点

Dubbo协议整体设计比较简洁,能采用1个bit表示的,不会用一个byte来表示;此外请求头和响应头一致,整体采用一套解析标准就可以,代码实现起来相对简单。

缺点

由于整体的设计相对简洁,导致扩展性不够;

Dubbo协议是如何解析的

在通信篇中我们讲过Codec2该接口,该接口提供了encode和decode个方法来实现消息与字节流之间的相互转换,关于该接口的实现我们没有讲解,这里我们来看看此部分和Dubbo协议有什么关系。

AbstractCodec抽象类没有实现Codec2中定义的接口方法,而是提供了几个给子类用的基础方法。

  1. getSerialization方法:通过SPI获取当前使用的序列化方式;

  2. checkPayload方法:检查编解码数据的长度,如果数据超长,会抛出异常;

  3. isClientSide、isServerSide方法:判断当前是Client端还是Server端;

接下来我们就来聊聊子类如何被解析的,我们可以看到四个子类的继承关系,重点介绍的是ExchangeCodec和DubboCodec,其他就是做一下简单介绍。 TransportCodec该类已经被标注为弃用,该类内部也就是根据getSerialization方法选择的序列化方法,对传入消息或ChannelBuffer进行序列化或反序列化。 TelnetCodec继承了TransportCodec的能力,该类主要是提供了对Telnet命令处理的能力,该功能主要是对服务进行治理的功能,这里后续我们画一点时间来进行介绍。

ExchangeCodec

ExchangeCodec继承了TelnetCodec,在该类基础上增加Dubbo协议头的处理能力,接下来我们首先来看下其核心字段,

//协议头长度
protected static final int HEADER_LENGTH = 16;
//魔数 判断是否是Dubbo协议
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
//设置请求响应标志位
protected static final byte FLAG_REQUEST = (byte) 0x80;
//单向还是双向标志位
protected static final byte FLAG_TWOWAY = (byte) 0x40;
//是否事件消息标志位
protected static final byte FLAG_EVENT = (byte) 0x20;
//序列化协议标志位
protected static final int SERIALIZATION_MASK = 0x1f;

通过核心字段我们可以发现其实和我们介绍的Dubbo的协议是一致的,因此接下来的encode和decode就是对Dubbo协议头的解密和编码,我们来下看encode方法,在encode方法中会根据需要编码的消息类型进行分类, 分为三类:Request、Response、telenet,encodeRequest方法专门对Request对象进行编码,encodeResponse方法对Response对象进行编码。

@Override
  public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    //Request
    if (msg instanceof Request) {
      encodeRequest(channel, buffer, (Request) msg);
      //Response
    } else if (msg instanceof Response) {
      encodeResponse(channel, buffer, (Response) msg);
    } else {
      //telenet
      super.encode(channel, buffer, msg);
    }
  }
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
  Serialization serialization = getSerialization(channel, req);
  //存储协议头
  byte[] header = new byte[HEADER_LENGTH];
  // set magic number.
  Bytes.short2bytes(MAGIC, header);

  //设置协议头标志位
  header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

  if (req.isTwoWay()) {
    header[2] |= FLAG_TWOWAY;
  }
  if (req.isEvent()) {
    header[2] |= FLAG_EVENT;
  }

  //记录请求ID
  Bytes.long2bytes(req.getId(), header, 4);

  //序列化请求 并统计序列化以后字节数
  int savedWriteIndex = buffer.writerIndex();
  //将写入位置后移16位
  buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
  //请求序列化
  ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

  //是否心跳检查 为空就是心跳检查
  if (req.isHeartbeat()) {
    // heartbeat request data is always null
    bos.write(CodecSupport.getNullBytesOf(serialization));
  } else {
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    //事件序列化
    if (req.isEvent()) {
      //事件序列化
      encodeEventData(channel, out, req.getData());
    } else {
      //Dubbo请求序列化
      encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
      ((Cleanable) out).cleanup();
    }
  }

  bos.flush();
  bos.close();
  //获取字节数
  int len = bos.writtenBytes();
  //检查字节长度
  checkPayload(channel, len);
  //将字节数写入header数组中
  Bytes.int2bytes(len, header, 12);

  //重置写入位置
  buffer.writerIndex(savedWriteIndex);
  //写入消息头
  buffer.writeBytes(header);
  //buffer写出去的位置从writeIndex开始 加上header长度 数据长度
  buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
  int savedWriteIndex = buffer.writerIndex();
  try {
    //序列化
    Serialization serialization = getSerialization(channel, res);
    //协议头  长度为16字节
    byte[] header = new byte[HEADER_LENGTH];
    //魔数
    Bytes.short2bytes(MAGIC, header);
    //序列化方式
    header[2] = serialization.getContentTypeId();
    //心跳还是正常消息
    if (res.isHeartbeat()) {
      header[2] |= FLAG_EVENT;
    }
    //响应状态
    byte status = res.getStatus();
    header[3] = status;
    //设置请求ID
    Bytes.long2bytes(res.getId(), header, 4);

    //写入时候真需要加上协议头长度
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

    //对响应信息进行编码
    if (status == Response.OK) {
      if(res.isHeartbeat()){
        //心跳
        bos.write(CodecSupport.getNullBytesOf(serialization));
      }else {
        //正常响应
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (res.isEvent()) {
          encodeEventData(channel, out, res.getResult());
        } else {
          encodeResponseData(channel, out, res.getResult(), res.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
          ((Cleanable) out).cleanup();
        }
      }
    } else {
      //错误消息
      ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
      out.writeUTF(res.getErrorMessage());
      out.flushBuffer();
      if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
      }
    }

    bos.flush();
    bos.close();

    //写入的长度
    int len = bos.writtenBytes();
    //检查消息长度
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);
    //重置写入位置
    buffer.writerIndex(savedWriteIndex);
    //写入消息头
    buffer.writeBytes(header);
    //buffer写出去的位置从writeIndex开始 加上header长度 数据长度
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
  } catch (Throwable t) {
    // clear buffer
    buffer.writerIndex(savedWriteIndex);
    // send error message to Consumer, otherwise, Consumer will wait till timeout.
    if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
      Response r = new Response(res.getId(), res.getVersion());
      r.setStatus(Response.BAD_RESPONSE);

      if (t instanceof ExceedPayloadLimitException) {
        logger.warn(t.getMessage(), t);
        try {
          r.setErrorMessage(t.getMessage());
          channel.send(r);
          return;
        } catch (RemotingException e) {
          logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
        }
      } else {
        // FIXME log error message in Codec and handle in caught() of IoHanndler?
        logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
        try {
          r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
          channel.send(r);
          return;
        } catch (RemotingException e) {
          logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
        }
      }
    }

    // Rethrow exception
    if (t instanceof IOException) {
      throw (IOException) t;
    } else if (t instanceof RuntimeException) {
      throw (RuntimeException) t;
    } else if (t instanceof Error) {
      throw (Error) t;
    } else {
      throw new RuntimeException(t.getMessage(), t);
    }
  }
}

ExchangeCodec的decode方法是encode方法的逆过程,会先检查魔数,然后读取协议头和后续消息的长度,最后根据协议头中的各个标志位构造相应的对象,以及反序列化数据。

DubboCodec

在ExchangeCodecencode的encode方法中,不论是encodeRequest还是encodeResponse都调用encodeRequestData方法,该方法会对Boby内容进行编码,该方法实现是在DubboCodec中,因此DubboCodec是对消息体的编解码,接下来我们来看下encodeRequestData和encodeResponseData方法的实现,

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
  RpcInvocation inv = (RpcInvocation) data;

  //dubbo服务版本
  out.writeUTF(version);
  // https://github.com/apache/dubbo/issues/6138
  String serviceName = inv.getAttachment(INTERFACE_KEY);
  if (serviceName == null) {
    //服务path
    serviceName = inv.getAttachment(PATH_KEY);
  }
  //服务名
  out.writeUTF(serviceName);
  //版本号
  out.writeUTF(inv.getAttachment(VERSION_KEY));
  //方法名
  out.writeUTF(inv.getMethodName());
  //方法类型描述
  out.writeUTF(inv.getParameterTypesDesc());
  Object[] args = inv.getArguments();
  if (args != null) {
    for (int i = 0; i < args.length; i++) {
      //参数值
      out.writeObject(encodeInvocationArgument(channel, inv, i));
    }
  }
  //附加属性
  out.writeAttachments(inv.getObjectAttachments());
}
@Override
  protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
  Result result = (Result) data;
  //检验版本
  boolean attach = Version.isSupportResponseAttachment(version);
  Throwable th = result.getException();
  if (th == null) {
    Object ret = result.getValue();
    if (ret == null) {
      //空结果
      out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
    } else {
      //正常写入
      out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
      out.writeObject(ret);
    }
  } else {
    //异常
    out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
    out.writeThrowable(th);
  }

  if (attach) {
    //Dubbo版本号
    result.getObjectAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
    out.writeAttachments(result.getObjectAttachments());
  }
}

结束

欢迎大家点点关注,点点赞!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/21633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【FreeRTOS】FreeRTOS删除任务vTaskDelete()

使用说明&#xff1a; 任务中。小时 &#xff08;任务句柄_t xTask&#xff09;; INCLUDE_vTaskDelete必须定义为1&#xff0c;才能使用此函数。有关更多信息&#xff0c;请参见RTOS配置文档。 从RTOS内核管理中删除任务。正在删除的任务将从所有就绪、阻止、暂停和事件列表中删…

CEAC 之《计算机应用助理工程师》1

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;微微的猪食小窝 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 微微的猪食小窝 原创 收录于专栏 【CEAC证书】 1组合框有3种不同的类型&#xff0c;这3种类型是下拉式组合框、简单组合框、下拉式列表框&…

12. PyQt5实现多页面切换之QTabBar

PyQt5 QTabBar 类 QTabBar 类直接继承自 QWidget。该类提供了一个选项卡栏&#xff0c;该类仅提供了一个选项卡&#xff0c; 并没有为每个选项卡提供相应的页面&#xff0c;因此要使选项卡栏实际可用&#xff0c;需要自行为每个选项 卡设置需要显示的页面&#xff0c;可以通过 …

【k8s】6、pod详解

文章目录一、pod介绍1、pod的基础概念2、pod定义&#xff08;资源清单&#xff09;二、Pod中的容器配置1、基本配置2、镜像拉取&#xff08;imagePullPolicy&#xff09;3、启动命令&#xff08;command&#xff09;4、环境变量&#xff08;env&#xff09;5、端口设置&#xf…

应急响应-计划任务排查

计划任务排查 由于很多计算机都会自动加载“计划任务”&#xff0c;“计划任务”也是恶意病毒实现持久化驻留的一种常用手段&#xff0c;因此在应急响应事件排查时需要进行排查。通俗的讲会定期执行某些操作。 Windows计划任务排查 任务计划是Windows系统的一个预置实现某些…

【数据结构】二叉树的顺序存储结构 —— 堆

文章目录前言二叉树的顺序存储堆的概念和结构堆的实现结构的定义接口总览初始化销毁插入向上调整删除向下调整取堆顶数据计算堆大小判空打印堆完整代码Heap.hHeap.ctest.c结语前言 今天&#xff0c;我们开始二叉树的学习。本篇博客的内容为 介绍二叉树的顺序存储 和 堆的实现。…

【滤波跟踪】基于matlab不变扩展卡尔曼滤波器对装有惯性导航系统和全球定位系统IMU+GPS进行滤波跟踪【含Matlab源码 2232期】

⛄一、简介 针对室内定位中的非视距&#xff08;Non-Line-of-Sight,NLOS&#xff09;现象,提出一个新型算法进行识别,同时有效缓解其影响.主要通过超宽带&#xff08;Ultra-Wideband,UWB&#xff09;定位系统与惯性导航系统&#xff08;Inertial Navigation System,INS&#x…

酒店管理系统的设计与实现

Word下载链接如下&#xff1a; https://download.csdn.net/download/yw1990128/87096359 一 设计背景 1.1 课题现状 随着国家社会经济水平的提升&#xff0c;各酒店的发展速度越来越快&#xff0c;入住人员也越来越多。酒店房间的管理要求也愈来愈大&#xff0c;所以很多酒店正…

45.Django模板

1.django模板配置 1.1 Django模板概述 作为一个Web框架&#xff0c;Django需要一种方便的方式来动态生成HTML。最常用的方法依赖于模板。模板包含所需HTML输出的静态部分以及描述如何插入动态内容的特殊语法。 ​ 对模板引擎的一般支持和Django模板语言的实现都存在于 djang…

Linux下的NFS服务(包含windows10下的nfs搭建)

目录 1.NFS服务介绍 2.Linux下搭建NFS服务 &#xff08;1&#xff09;下载NFS服务端 &#xff08;2&#xff09;新建一个共享文件 &#xff08;3&#xff09;修改NFS服务配置文件 &#xff08;4&#xff09;重新启动NFS服务 &#xff08;5&#xff09;显示查看共享的文件…

38、常用类之String类

1、基本介绍&#xff1a; String s5new String(byte[] b)&#xff1b; &#xff08;5&#xff09;String实现了Serializable&#xff0c;说明String可以串行化&#xff0c;即可以网络传输 String实现了Comparable&#xff0c;说明String对象可以比较 &#xff08;6&#xff0…

JavaScript基础(13)_原型、原型对象

上一章构造函数确实简化了多个对象创建的麻烦问题&#xff0c;但是&#xff1a;构造函数每创建一个实例&#xff0c;构造函数就会执行一次&#xff0c;将属性和方法添加到该对象&#xff0c;每个对象实例化后地址互不相同&#xff0c;即使它们的方法所实现的逻辑和功能一样&…

pytorch初学笔记(八):神经网络之卷积操作

目录 一、卷积操作 二、二维卷积操作 2.1 torch.nn.functional 2.2 conv2d方法介绍 2.2.1 使用该方法需要引入的参数 2.2.2 常用参数 2.2.3 关于对input和weight的shape详解 三、代码实战 3.1 练习要求 3.2 tensor的reshape操作 3.3 不同stride的对比 3.4 不同pad…

Docker面试

1. Docker和虚拟机的区别&#xff1f; 虚拟机Virtual Machine与容器化技术&#xff08;代表Docker&#xff09;都是虚拟化技术&#xff0c;两者的区别在于虚拟化的程度不同。 隔离性 由于vm对操作系统也进行了虚拟化&#xff0c;隔离的更加彻底。而Docker共享宿主机的操作系统…

数字化转型总体需求

基于“两型三化九力”对企业数字化的要求&#xff0c;以建设产品全生命周期管理平台为手段和途径&#xff0c;打通设计、工艺、制造及交付服务的全生命周期的数字线&#xff0c;实现数字化设计、数字化仿真、数字化制造、数字化服务及数字化管理&#xff0c;未来以此为基础实现…

【计算机毕业设计】11.毕业生信息管理系统+vue

一、系统截图&#xff08;需要演示视频可以私聊&#xff09; 摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 毕业生信息招聘平台&#xff0c;主要的模块包括查看管理员&#xff1b;首页、个…

zk常用命令ls、ls2、get、stat,参数意思(重补早期学习记录)

前言:补学习记录,几年前写一半丢草稿箱,突然看到,有强迫症所以补完 1.连接zk客户端(进入zk后台) ./zkCli.sh 连接成功 使用help查看有哪些命令可以使用 试试ls和ls2的区别 ls显示指定路径下的目录 ls2不仅可以 显示指定路径下的目录,还可以显示该节点的相关状态信息…

OpenGL 单色

目录 一.OpenGL 单色图 1.IOS Object-C 版本1.Windows OpenGL ES 版本2.Windows OpenGL 版本 二.OpenGL 单色 GLSL Shader三.猜你喜欢 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录 >> OpenGL ES 基础 零基础 OpenGL ES 学习路线推荐 : OpenGL ES 学习目录 >…

非关系型数据库MongoDB是什么/SpringBoot如何使用或整合MongoDB

写在前面&#xff1a; 继续记录自己的SpringBoot学习之旅&#xff0c;这次是SpringBoot应用相关知识学习记录。若看不懂则建议先看前几篇博客&#xff0c;详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用&#xff01; 3.4.3.3 Mongodb 3.4.3.3.1 介绍 MongoDB是一个开…

【Tomcat专题】Tomcat如何打破双亲委派机制?

文章目录类加载器双亲委派机制双亲委派的好处Tomcat的类加载器loadClass总体加载步骤&#xff1a;类加载器 三种JDK内部的类加载器 启动类加载器&#xff08;BootStrap ClassLoader&#xff09; 负责加载JRE\lib下的rt.jar、resources.jar、charsets.jar包中的class。 扩展…