基于JavaSocket重写Dubbo网络传输层

news2025/1/23 13:01:44

前言

我们知道,位于 Serialize 层上面的是负责网络传输的 Transport 层,它负责调用编解码器 Codec2 把要传输的对象编码后传输、再对接收到的字节序列解码。

站在客户端的角度,一次 RPC 调用的流程大概是这样的:

  • Invoker 发起 RPC 调用请求
  • Exchange 层负责数据交换,实现 Request-Response 语义
  • Transport 层调用编码器对 Request 编码后发送,主线程阻塞等待
  • IO 线程读取到服务端响应的数据,解码器解码后得到结果,唤醒主线程

image.png
清楚这个流程之后,我们尝试把 Dubbo 默认用 Netty 实现的传输层替换成我们自己实现的。
特别声明:Netty 已经做的足够好了,我们这么做并没有什么意义,只是为了加深你对 Dubbo 传输层工作流程的理解。

自定义Transport

新建一个模块dubbo-extension-transport-javasocket用来封装我们自己的传输层实现。
因为要写的是 Dubbo 传输层的一个实现策略,所以要依赖dubbo-remoting-api

<dependencies>
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-remoting-api</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
</dependencies>

Transporter

传输层的核心 SPI 接口是 org.apache.dubbo.remoting.Transporter,我们自己实现一个。
JavaSocketTransporter 的核心是:

  • Dubbo 开启服务时创建 JavaSocketServer
  • 客户端和服务端建立连接时创建 JavaSocketClient
public class JavaSocketTransporter implements Transporter {

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new JavaSocketServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new JavaSocketClient(url, handler);
    }
}

Channel

Dubbo 抽象了 org.apache.dubbo.remoting.Channel 接口来表示一个 tcp 连接,我们用的 Java Socket 实现,对应的类是 java.nio.channels.SocketChannel。但是我们要写一个类来把 SocketChannel 适配成 Dubbo 的 Channel。
Java SocketChannel 并不支持维护属性,Dubbo Channel 是支持的,所以我们专门搞个 Map 记录一下。

public class JavaSocketChannel extends AbstractChannel {

    private static final ConcurrentMap<SocketChannel, JavaSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
    private final SocketChannel socketChannel;
    private final Codec2 codec;
    private final Map<String, Object> attributes = new ConcurrentHashMap<>();

    private JavaSocketChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = codec;
        this.socketChannel = sc;
    }

    static JavaSocketChannel getOrAddChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
        JavaSocketChannel javaSocketChannel = CHANNEL_MAP.get(sc);
        if (javaSocketChannel == null) {
            javaSocketChannel = new JavaSocketChannel(codec, sc, url, handler);
        }
        return javaSocketChannel;
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        try {
            return (InetSocketAddress) socketChannel.getRemoteAddress();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean isConnected() {
        return socketChannel.isConnected();
    }

    @Override
    public boolean hasAttribute(String key) {
        return attributes.containsKey(key);
    }

    @Override
    public Object getAttribute(String key) {
        return attributes.get(key);
    }

    @Override
    public void setAttribute(String key, Object value) {
        attributes.put(key, value);
    }

    @Override
    public void removeAttribute(String key) {
        attributes.remove(key);
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        try {
            return (InetSocketAddress) socketChannel.getLocalAddress();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        try {
            ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
            codec.encode(this, channelBuffer, message);
            socketChannel.write(channelBuffer.toByteBuffer());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RemotingServer

再看服务端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractServer 实现了一些通用的逻辑,我们的 JavaSocketServer 直接继承它即可。
Dubbo 启动暴露服务时,会一并开启我们给定的 JavaSocketServer,方法是doOpen
JavaSocketServer 核心是:

  • 通过 ServerSocketChannel 绑定本地端口来开启一个服务
  • 处理客户端连接
  • 网络IO数据读取、解码
public class JavaSocketServer extends AbstractServer {

    private ServerSocketChannel serverSocketChannel;
    private Map<SocketChannel, JavaSocketChannel> channelMap;

    public JavaSocketServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(getBindAddress());
        serverSocketChannel.configureBlocking(false);
        channelMap = new ConcurrentHashMap<>();
        // 开启一个线程来读网络数据
        new Thread(() -> {
            try {
                while (true) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if (socketChannel != null) {
                        // 建立新连接,丢到Map
                        socketChannel.configureBlocking(false);
                        channelMap.put(socketChannel, JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler()));
                    }
                    // 遍历所有连接,看看是否有数据可读
                    for (Map.Entry<SocketChannel, JavaSocketChannel> entry : channelMap.entrySet()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
                        int length = entry.getKey().read(byteBuffer);
                        if (length > 0) {
                            byteBuffer.flip();
                            // 读到新数据,尝试解码,交给后续handler处理
                            Object decode = getCodec().decode(entry.getValue(), ChannelBuffers.wrappedBuffer(byteBuffer));
                            if (decode != null) {
                                getDelegateHandler().received(entry.getValue(), decode);
                            }
                        }
                    }
                    Thread.sleep(10);// sleep一会,避免空转
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    @Override
    protected void doClose() throws Throwable {
        if (serverSocketChannel != null) {
            serverSocketChannel.close();
        }
    }

    @Override
    public boolean isBound() {
        return serverSocketChannel.isOpen();
    }

    @Override
    public Collection<Channel> getChannels() {
        return null;
    }

    @Override
    public Channel getChannel(InetSocketAddress remoteAddress) {
        return null;
    }
}

Client

最后是客户端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractClient 实现了一些通用逻辑,我们的 JavaSocketClient 也直接继承它即可。
JavaSocketClient 核心是:

  • 和服务端建立连接
  • Request 对象编码后发送
  • 读取服务端响应的数据、解码
public class JavaSocketClient extends AbstractClient {

    private SocketChannel socketChannel;

    public JavaSocketClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        socketChannel = SocketChannel.open();
        System.err.println("client open");
    }

    @Override
    protected void doClose() throws Throwable {

    }

    @Override
    protected void doConnect() throws Throwable {
        System.err.println("client connet");
        if (socketChannel.connect(getConnectAddress())) {
            new Thread(() -> {
                try {
                    while (true) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
                        int length = socketChannel.read(byteBuffer);
                        if (length > 0) {
                            byteBuffer.flip();
                            Object decode = getCodec().decode(getChannel(), ChannelBuffers.wrappedBuffer(byteBuffer));
                            getDelegateHandler().received(getChannel(), decode);
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    @Override
    protected void doDisConnect() throws Throwable {

    }

    @Override
    protected Channel getChannel() {
        return JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler());
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        try {
            ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
            getCodec().encode(getChannel(), channelBuffer, message);
            socketChannel.write(channelBuffer.toByteBuffer());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

至此,自定义的传输层逻辑就写完了。接下来是让 Dubbo 加载并使用我们自定义的实现,可以通过 SPI 机制。
创建META-INF/dubbo/org.apache.dubbo.remoting.Transporter文件,编写内容:

javasocket=dubbo.extension.remoting.transport.javasocket.JavaSocketTransporter

服务端可以在 ProtocolConfig 里指定:

ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", 20880);
protocolConfig.setServer("javasocket");

客户端可以在 ReferenceConfig 里配置参数指定:

Map<String, String> parameters = new HashMap<>();
parameters.put("client", "javasocket");

ReferenceConfig.setParameters(parameters);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1395030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CSS实现的 Loading 效果

方式一、纯CSS实现 代码&#xff1a;根据需要复制 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>CSS Animation Library for Developers and Ninjas</title><style>/* ---------------…

操作系统课程设计-Windows 线程的互斥和同步

目录 前言 1 实验题目 2 实验目的 3 实验内容 3.1 步骤 3.2 关键代码 3.2.1 创建生产者和消费者进程 3.2.2 生产者和消费者进程 4 实验结果与分析 5 代码 前言 本实验为课设内容&#xff0c;博客内容为部分报告内容&#xff0c;仅为大家提供参考&#xff0c;请勿直接抄…

SqlAlchemy使用教程(五) ORM API 编程入门

SqlAlchemy使用教程(一) 原理与环境搭建SqlAlchemy使用教程(二) 入门示例及编程步骤SqlAlchemy使用教程(三) CoreAPI访问与操作数据库详解SqlAlchemy使用教程(四) MetaData 与 SQL Express Language 的使用SqlAlchemy使用教程(五) ORM API 编程入门 前一章用SQL表达式(SQL Expr…

机器学习之卷积神经网络

卷积神经网络是一类包含卷积计算且具有深度结构的前馈神经网络,是深度学习的代表算法之一。卷积神经网络具有表征学习能力,能够按其阶层结构对输入信息进行平移不变分类,因此又称为SIANN。卷积神经网络仿照生物的视知觉机制构建,可以进行监督学习和非监督学习,其隐含层内的…

【Internet Protocol】ip介绍,如何组局域网实现远程桌面和文件共享

文章目录 1.何为“上网”1.1 定义1.2 为什么连了WiFi就能上网了&#xff1f; 2.ip2.1 什么是ip2.2 为什么区分广域网和局域网&#xff0c;ip的唯一性2.3 如何查看设备的ip2.4 什么叫"ping"2.5 区分是否两个ip是否在同一局域网2.5.1 最稳妥的方式&#xff1a;ip&m…

Linux第31步_了解STM32MP157的TF-A

了解STM32MP157的TF-A&#xff0c;为后期移植服务。 一、指令集 ARMV8提供了两种指令集:AAarch64和AArch32&#xff0c;根据字面意思就是64位和32位。 ARMV7提供的指令集是AArch32。 二、TF-A 指令集是AArch64的芯片&#xff0c;TF-A有&#xff1a;bl1、bl2、bl31、bl32 和…

【Linux】进入一个目录需要什么权限-目录的权限

Linux目录权限 在Linux中&#xff0c;目录也是文件&#xff0c;是文件就有属性&#xff0c;就有权限 在Linux中&#xff0c;我们可以通过cd命令进入目录 那么我们要进入一个目录&#xff0c;需要有什么权限呢&#xff1f; 目录和普通文件一样&#xff0c;也是有权限的 测试证…

分布式Erlang/OTP(学习笔记)(一)

Erlang分布式基础 假设你在机器A和机器B上各跑着一个Simple Cache应用的实例。要是在机器A的缓存上插人一个键/值对之后&#xff0c;从机器B上也可以访问&#xff0c;那可就好了。显然&#xff0c;要达到这个目的&#xff0c;机器A必须以某种方式将相关信息告知给机器B。传递该…

Cinder组件作用

1、Cinder下发的流程 &#xff08;1&#xff09;Cinder-api接受上层发送的创建请求&#xff0c;然后把请求下发给Cinder-scheduler调度服务 &#xff08;2&#xff09;Cinder-scheduler调度服务&#xff0c;计算出哪个主机更适合创建&#xff0c;计算出来之后再把请求下发到Ci…

查询数据库表字段具有某些特征的表

目录 引言举例总结 引言 当我们把一个项目做完以后&#xff0c;客户要求我们把系统中所有的电话&#xff0c;证件号等进行加密处理时&#xff0c;我们难道要一个表一表去查看那些字段是电话和证件号码吗&#xff1f; 这种办法有点费劲&#xff0c;下面我们来探索如何找到想要的…

【大数据】Flink 测试利器:DataGen

Flink 测试利器&#xff1a;DataGen 1.什么是 FlinkSQL &#xff1f;2.什么是 Connector &#xff1f;3.DataGen Connector3.1 Demo3.2 支持的类型3.3 连接器属性 4.DataGen 使用案例4.1 场景一&#xff1a;生成一亿条数据到 Hive 表4.2 场景二&#xff1a;持续每秒生产 10 万条…

进程间通信之匿名管道通信

每一次的努力都是自我成长的一步&#xff0c;坚持不懈的付出会铺就通向成功的道路。文章目录 进程间通信的介绍进程间通信的发展进程间通信的分类进程间通讯的本质资源&#xff1f;这个资源谁提供的&#xff1f; 管道什么是管道匿名管道管道小总结现在我给大家看一下管道通信的…

SCDN高防如何保护你的服务器

随着互联网的发展&#xff0c;如今的网络世界&#xff0c;虽说给我们的衣食住行带来了非常大的便利&#xff0c;但同时它存在着各种各样的威胁。比如我们的网站&#xff0c;如果不做任何保护措施的话&#xff0c;就很容易被DDoS、CC等攻击堵塞网络、窃取目标系统的信息&#xf…

这种网页要小心!注意你的账号密码泄露!

目录 H5是泄露账号和数据的重要渠道 代码混淆是最佳的安全保护手段 基于AI的自适应代码混淆 我们经常见到各类H5海报&#xff0c;产品展示、活动促销、招聘启事等。H5不仅能够无缝地嵌入App、小程序&#xff0c;还可以作为一个拥有独立链接地址的页面&#xff0c;直接在PC端打开…

AIOps案例 | 携手擎创,中邮信科成功打造新一代IT智能运维平台,收益明显!

为推动邮政信息科技体制改革、提升信息科技自主供给能力&#xff0c;在原信息技术局、数据中心和软开中心基础上&#xff0c;中邮信息科技&#xff08;北京&#xff09;有限公司(简称“中邮信科公司”)经中国邮政集团有限公司于2019年5月被批准成立。 公司主要负责邮政各类信息…

[论文阅读]DeepFusion

DeepFusion Lidar-Camera Deep Fusion for Multi-Modal 3D Object Detection 用于多模态 3D 物体检测的激光雷达相机深度融合 论文网址&#xff1a;DeepFusion 论文代码&#xff1a;DeepFusion 摘要 激光雷达和摄像头是关键传感器&#xff0c;可为自动驾驶中的 3D 检测提供补…

通过OpenIddict设计一个授权服务器03-客户凭证流程

在本部分中&#xff0c;我们将把 OpenIddict 添加到项目中&#xff0c;并实施第一个授权流程&#xff1a;客户端凭证流。 添加 OpenIddict 软件包 首先&#xff0c;我们需要安装 OpenIddict NuGet 软件包 dotnet add package OpenIddict dotnet add package OpenIddict.AspN…

Android CarService源码分析

文章目录 一、CarService的基本架构1.1、Android Automative整体框架1.2、Framework CarService1.3、目录结构1.3.1、CarService1.3.2、Car APP 二、CarService的启动流程2.1、系统启动后在SystemServer进程中启动CarServiceHelperService2.2、CarService启动 三、CarService源…

浅聊雷池社区版(WAF)的tengine

雷池社区版是一个开源的免费Web应用防火墙&#xff08;WAF&#xff09;&#xff0c;专为保护Web应用免受各种网络攻击而设计。基于强大的Tengine&#xff0c;雷池社区版提供了一系列先进的安全功能&#xff0c;适用于中小企业和个人用户。 Tengine的故事始于2011年&#xff0c;…

Android-三方框架的源码

ARouter Arouter的整体思路是moduelA通过中间人ARouter把路由信息的存到仓库WareHouse&#xff1b;moduleB发起路由时&#xff0c;再通过中间人ARouter从仓库WareHouse取出路由信息&#xff0c;这要就实现了没有依赖的两者之间的跳转与通信。其中涉及Activity的跳转、服务prov…