Dubbo分层设计之Transport层

news2025/1/15 7:27:23

前言

Dubbo 框架采用分层设计,最底下的 Serialize 层负责把对象序列化为字节序列,再经过 Transport 层网络传输到对端。一次 RPC 调用,在 Dubbo 看来其实就是一段请求报文和一段响应报文的传输过程。

理解Transport

Transport 层即网络传输层,它在 Serialize 的上层,Exchange 的下层,起到一个承上启下的作用。
有很多网络库可以做网络传输,比如 Netty、Mina、甚至是 JDK 原生的 Socket,但是这些库的使用方式和对外接口都不一样,如果直接依赖三方库开发,后续更换实现方案就非常麻烦了,违背了开闭原则。
所以 Transport 层对网络传输做了抽象,它把 Netty、Mina 封装成统一接口,上层面向接口编程,具体实现可以轻松替换。
Transport 层只负责数据的传输,至于要传输什么数据它是不关心的,也不应该关心。传输的数据格式和通信协议息息相关,应该由协议层去定义。

设计实现

Transport 层和 Exchange 层的代码都位于dubbo-remoting模块,同样的,dubbo-remoting-api模块只定义抽象接口,其它子模块负责具体实现。
Dubbo 官方内置的 remoting 模块:

<modules>
    <module>dubbo-remoting-api</module>
    <module>dubbo-remoting-netty</module>
    <module>dubbo-remoting-mina</module>
    <module>dubbo-remoting-grizzly</module>
    <module>dubbo-remoting-p2p</module>
    <module>dubbo-remoting-http</module>
    <module>dubbo-remoting-zookeeper</module>
    <module>dubbo-remoting-netty4</module>
    <module>dubbo-remoting-etcd3</module>
</modules>

抽象层

Transport 层的核心SPI接口是 Transporter,默认实现是 Netty。

@SPI("netty")
public interface Transporter {

    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

接口定义了两个方法,bind()用于服务端绑定本地接口,connect()用于客户端和服务器建立连接。

ChannelHandler 接口用来定义 Channel 事件,Dubbo 目前定义了五个事件,分别是:

  • connected:连接事件
  • disconnected:连接断开事件
  • sent:数据发送事件
  • received:数据接收事件
  • caught:异常事件
@SPI
public interface ChannelHandler {

    void connected(Channel channel) throws RemotingException;

    void disconnected(Channel channel) throws RemotingException;

    void sent(Channel channel, Object message) throws RemotingException;

    void received(Channel channel, Object message) throws RemotingException;

    void caught(Channel channel, Throwable exception) throws RemotingException;
}

Channel 接口抽象的是一个tcp连接,它继承自 Endpoint,代表它也是一个端点。既然是连接,那自然就拥有发送数据、主动断开、读写属性等能力。

public interface Channel extends Endpoint {

    // 远程地址
    InetSocketAddress getRemoteAddress();

    // 是否连接
    boolean isConnected();

    /****属性的读写****/
    boolean hasAttribute(String key);

    Object getAttribute(String key);

    void setAttribute(String key, Object value);

    void removeAttribute(String key);
}
public interface Endpoint {

    URL getUrl();

    // Channel事件处理器
    ChannelHandler getChannelHandler();

    // 本地地址
    InetSocketAddress getLocalAddress();

    // 发送数据
    void send(Object message) throws RemotingException;
    void send(Object message, boolean sent) throws RemotingException;

    // 关闭连接
    void close();
    void close(int timeout);

    void startClose();

    boolean isClosed();
}

RemotingServer 抽象的是服务器接口,通过绑定本地接口可以获得一个服务器对象,它会维护所有和它建立连接的 Channel。

public interface RemotingServer extends Endpoint, Resetable, IdleSensible {

    // 是否绑定
    boolean isBound();
    // 拿所有连接
    Collection<Channel> getChannels();
    // 根据远程地址拿连接
    Channel getChannel(InetSocketAddress remoteAddress);

    @Deprecated
    void reset(org.apache.dubbo.common.Parameters parameters);
}

Client 接口抽象的是客户端,它继承自 Channel,所以它也是一个连接,可以向远程发送数据。

public interface Client extends Endpoint, Channel, Resetable, IdleSensible {

    // 重连
    void reconnect() throws RemotingException;

    @Deprecated
    void reset(org.apache.dubbo.common.Parameters parameters);
}

除了围绕 Transporter 的这些接口,传输层还有一个很重要的接口。
Codec2 是网络编解码器的抽象接口,我们代码里发送的是 Object,对象本身不能通过网络传输,得经过编码器把它编码为字节序列才能发送。同样的,对端收到的也是一段字节序列,得经过解码器按照相同的规则解码为 Object。

@SPI
public interface Codec2 {

    // 编码
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    // 解码
    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;

    enum DecodeResult {
        NEED_MORE_INPUT,
        SKIP_SOME_INPUT
    }
}

实现层

直接看默认实现,基于 Netty4 的 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

NettyServer

image.png
基于 Netty 实现的服务器类是 NettyServer,继承关系比较复杂,看看每一层构造函数都干了啥。
AbstractPeer 主要是保存 URL 和 ChannelHandler

public AbstractPeer(URL url, ChannelHandler handler) {
    this.url = url;
    this.handler = handler;
}

AbstractEndpoint 通过 SPI 加载编解码器 Codec2 的实现

public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler);
    this.codec = getChannelCodec(url);
    this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}

AbstractServer 构造函数是个模板方法,调用子类的doOpen开启服务,然后创建线程池

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
    // 获取绑定的IP和端口
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 最大连接数、连接空闲超时时间
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        doOpen(); // 开启服务
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    // 业务线程池
    executor = executorRepository.createExecutorIfAbsent(url);
}

NettyServer 是具体实现,主要是基于 Netty 开启服务,常规的 ServerBootstrap 启动流程,我们重点关注的是 Dubbo 对 ChannelPipeline 的编排。

  • 如果开启 ssl,会插入一个 SslServerTlsHandler
  • 接着插入编解码器
  • IdleStateHandler 用于关闭超时闲置的连接
  • NettyServerHandler 主要是对我们的业务处理器 handler 再包装了一层
@Override
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    // Accept线程
    bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
    // IO线程 CPU核心数+1 最大不会超过32
    workerGroup = NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            "NettyServerWorker");
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        // 开启ssl
                        ch.pipeline().addLast("negotiation",
                                SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                    }
                    // 编排ChannelPipeline
                    ch.pipeline()
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            /**
                             * @see HeaderExchanger#bind(URL, ExchangeHandler)
                             */
                            .addLast("handler", nettyServerHandler);
                }
            });
    // 绑定端口,同步等待完成
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

NettyClient

基于 Netty 实现的客户端类是 org.apache.dubbo.remoting.transport.netty4.NettyClient。
image.png
基类是一样的,用于初始化编解码器 Codec2。AbstractClient 会调用子类doOpen开启客户端,紧接着调用connect和服务器建立连接。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    // 初始化线程池
    initExecutor(url);
    doOpen();// 开启客户端
    connect(); // 建立连接
}

doOpen也是常规的 Netty Bootstrap 启动流程,ChannelPipeline 的编排和 Server 端一致。

@Override
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(NIO_EVENT_LOOP_GROUP)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(socketChannelClass());

    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
            }

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);

            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

doConnect会调用Bootstrap#connect和服务端建立连接,连接成功后会得到一个 Channel 对象,Dubbo 可以通过它给服务端发数据。

@Override
protected void doConnect() throws Throwable {
	ChannelFuture future = bootstrap.connect(getConnectAddress());
	boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
	if (ret && future.isSuccess()) {
        Channel newChannel = future.channel();
        NettyClient.this.channel = newChannel;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1385553.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机毕业设计----SSH在线水果商城平台含管理系统

项目介绍 本项目分为前后台&#xff0c;分为普通用户与管理员两个角色&#xff0c;前台为普通用户登录&#xff0c;后台为管理员登录&#xff1b; 管理员角色包含以下功能&#xff1a; 管理员登录,修改密码,类别管理,水果管理,订单管理,网站论坛管理,网站公告管理等功能。 …

抖音小店2024年创业新趋势,新手找项目,不要再错过这次的机会了

大家好&#xff0c;我是电商花花。 现在的抖音小店完全是电商创业中的一个优秀代名词和最轻便的创业项目&#xff0c;更是以独特的直播达人带货的优势将店铺激发出来。 今天给大家介绍下抖音小店的运作方式&#xff0c;并分析互联网创业的机遇&#xff0c;并提供相关的再做点…

Unity中URP下 SimpleLit框架

文章目录 前言一、整体框架1、该Shader是用于低端设备的2、包含一个Properties3、只有一个SubShader4、如果SubShader错误&#xff0c;返回洋葱紫5、调用自定义ShaderGUI面板 二、SubShader中1、Tags2、Pass 三、我们看一下ForwardLit的Pass1、混合模式、深度写入、面皮剔除、透…

ZooKeeper 简介

1、概念介绍 ZooKeeper 是一个开放源码的分布式应用程序协调服务&#xff0c;为分布式应用提供一致性服务的软件&#xff0c;由雅虎创建&#xff0c;是 Google Chubby 的开源实现&#xff0c;是 Apache 的子项目&#xff0c;之前是 Hadoop 项目的一部分&#xff0c;使用 Java …

提高执行力,关键在于管理者做到这四个字

执行力&#xff0c;对于个人而言&#xff0c;它就是办事的效能&#xff1b;而对于领导来说&#xff0c;它是管理的能力。 老板命令员工去买复印纸&#xff0c;员工第一次买回了一沓复印纸&#xff0c;第二次买了三摞复印纸&#xff0c;却仍然没有得到老板的满意。员工之所以跑…

Halcon滤波器 laplace 算子

Halcon滤波器 laplace 算子 使用laplace 算子对图像进行二次求导&#xff0c;会在边缘产生零点&#xff0c;因此该算子常常与zero_crossing算子配合使用。求出这些零点&#xff0c;也就得到了图像的边缘。同时&#xff0c;由于laplace算子对孤立像素的响应要比对边缘或线的响应…

element upload 自定义上传 报错Cannot set properties of null (setting ‘status‘)

element upload 自定义上传 报错Cannot set properties of null (setting ‘status’) 问题展示 原因分析 自定义上传方式 fileList 显示一切正常&#xff0c;状态也是成功 文件url通过URL.createObjectURL(file.raw) 进行添加 以下为配置代码 <el-uploadclass"uplo…

【K12】Python写串联电阻问题的求解思路解析

问题源代码 方法&#xff1a;calculate_circuit_parameter 构造题目&#xff1a; 模板&#xff1a; 已知电阻R1为 10Ω&#xff0c;电阻R2为 5Ω&#xff0c;电压表示数为2.5V&#xff0c;求电源电压U&#xff1f; 给合上面题目&#xff0c;利用Python程序&#xff0c;可以任…

【ScienceAI Weekly】DeepMind拆分的AI药企达成30亿美元新协议;网传字节跳动在美招聘生物/化学/物理人才

AI for Science 的新成果、新动态、新视角—— 由 DeepMind 拆分的 AI 药企首次达成制药合作&#xff0c;价值 30 亿美元微软协助科研人员发现 3,200 万种新电池材料网传 TikTok 在美国各地招募计算生物学、量子化学、分子动力学和物理方面的人才科大讯飞拟分拆医疗业务在港交…

遥感卫星影像现拍,哪里想看拍哪里!

我们为大家分享了查看实时卫星影像的方法。 虽然这个网站的卫星影像10分钟一更新&#xff0c;让世界尽收眼底&#xff0c;但分辨率却非常有限。 如果项目中需要更高清的卫星影像&#xff0c;且对时效性又有较高的要求&#xff0c;那么可以考虑用卫星专门拍摄。 光学遥感卫星…

为什么有人说PMP是水证,它的含金量到底怎么样?

在我国大陆&#xff0c;有好多证书被商业化得太重了&#xff0c;甚至演变成了个人或一些公司摇钱的工具。所以有些证书受人吹捧它崛起的快&#xff0c;但是活不长&#xff0c;甚至“夭折”&#xff0c;比如以前微软系列的证书&#xff1b; 而PMP认证从国外引进大陆这么多年了&…

【昕宝爸爸小模块】守护线程、普通线程、两者之间的区别

➡️博客首页 https://blog.csdn.net/Java_Yangxiaoyuan 欢迎优秀的你&#x1f44d;点赞、&#x1f5c2;️收藏、加❤️关注哦。 本文章CSDN首发&#xff0c;欢迎转载&#xff0c;要注明出处哦&#xff01; 先感谢优秀的你能认真的看完本文&…

打造完美跨境商城源码,助你轻松进军国际市场

随着全球化的深入&#xff0c;跨境电商已成为各国企业拓展国际市场的重要途径之一。根据最新数据显示&#xff0c;跨境电商市场规模逐年扩大&#xff0c;预计未来几年将保持较高增长率。因此&#xff0c;拥有一套完善的跨境商城源码成为企业进军国际市场的关键。 跨境商城源码…

Java--ListUtil工具类,实现将一个大列表,拆分成指定长度的子列表

文章目录 前言实现代码执行结果 前言 在项目中有时会出现列表很大&#xff0c;无法一次性批量操作&#xff0c;我们需要将列表分成指定大小的几个子列表&#xff0c;一份一份进行操作&#xff0c;本文提供这样的工具类实现这个需求。 实现代码 以下为ListUtil工具类代码实现…

【数据结构和算法】删除链表的中间节点

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 三、代码 四、复杂度分析 前言 这是力扣的1657题&#xff0c;难度为中等&#xff0c;解题方案有很多种&…

Java 树形结构数据生成导出excel文件V2

** >> 相对于V1版本&#xff0c;优化了代码逻辑&#xff0c;合理使用递归计算树数据的坐标 << ** 1、效果 2、使用方法 import com.alibaba.fastjson.JSONArray; import org.apache.poi.hssf.usermodel.HSSFWorkbook; import org.apache.poi.ss.usermodel.Workboo…

[HTML]Web前端开发技术12(HTML5、CSS3、JavaScript )——喵喵画网页

希望你开心&#xff0c;希望你健康&#xff0c;希望你幸福&#xff0c;希望你点赞&#xff01; 最后的最后&#xff0c;关注喵&#xff0c;关注喵&#xff0c;关注喵&#xff0c;佬佬会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的…

多商户入驻系统APP源码系统:功能强大+分销+秒杀+拼团+砍价+优惠券+完整的安装代码包以及搭建教程

科技的不断发展&#xff0c;互联网在不断的进步&#xff0c;传统的商业形态正在逐步向数字化转型。在这个大背景下&#xff0c;多商户入驻系统APP源码系统应运而生&#xff0c;旨在为各类商家提供一个功能强大的线上商业平台&#xff0c;以提升其市场竞争力。该系统集成了丰富的…

Linux:NTP校时、PTP校时

目录 前言一、NTP校时1、简介2、ubuntu使用 NTP3、嵌入式设备使用 NTP 校时4、NTP 服务器的校时精度 二、PTP校时1、简介2、ubuntu使用 PTP3、嵌入式设备使用 PTP 校时 三、PTP 校时和 NTP 校时那个精度高一些 前言 在进行网络协议通信时&#xff0c;我们有时候需要计算通信的延…

【面试合集】说说提高微信小程序的应用速度的手段有哪些?

面试官&#xff1a;说说提高微信小程序的应用速度的手段有哪些&#xff1f; 一、是什么 小程序启动会常常遇到如下图场景&#xff1a; 这是因为&#xff0c;小程序首次启动前&#xff0c;微信会在小程序启动前为小程序准备好通用的运行环境&#xff0c;如运行中的线程和一些基…