【dubbo triple provider 底层流转】

news2025/1/12 20:10:28

一、maven依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-codec-http2</artifactId>
    <version>4.1.90.Final</version>
</dependency>

<dependency>
    <groupId>org.apache.dubbo</groupId>
    <artifactId>dubbo</artifactId>
    <version>3.1.8</version>
  <!--  <version>3.2.2</version>-->
</dependency>

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>


<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>5.3.0</version>
</dependency>


<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>4.0.0-rc-2</version>
</dependency>

二、demo代码

public interface TestTripleService {
    String sayHello(String name);
}

public class TestTripleServiceImpl implements TestTripleService{

    @Override
    public String sayHello(String name) {
        System.out.println(Thread.currentThread().getName() + " call :" + name);
        return "hello :" + name;
    }
}

public class TripleConsumer {
    public static void main(String[] args) throws IOException {
        System.setProperty("dubbo.application.logger", "slf4j");
        ReferenceConfig<TestTripleService> ref = new ReferenceConfig<>();
        ref.setInterface(TestTripleService.class);
        //ref.setCheck(false);
        ref.setProtocol(CommonConstants.TRIPLE);
       // ref.setLazy(true);
        ref.setTimeout(100000);
        ref.setApplication(new ApplicationConfig("triple-consumer"));
        ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));
        final TestTripleService tripleService = ref.get();

        System.out.println("dubbo ref started");
        String result =  tripleService.sayHello("123");
        System.out.println(Thread.currentThread().getName() + " result :" + result);

    }
}


public class TripleProvider {
    public static void main(String[] args) throws InterruptedException {
        System.setProperty("dubbo.application.logger", "slf4j");
        ServiceConfig<TestTripleService> service = new ServiceConfig<>();
        service.setInterface(TestTripleService.class);
        service.setRef(new TestTripleServiceImpl());
        // 这里需要显示声明使用的协议为triple
        service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
        service.setApplication(new ApplicationConfig("triple-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));
        service.export();
        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

三、其他相关配置

1、 jdk使用1.8

2、zookeeper使用3.7.1 【适配java jar版本】

四、调试问题

1、 一开始使用dubbo3.2.2的版本 provider 可以 正常启动,但是consumer启动报缺少一个类,发现的是dubbo自已的maven依赖冲突了, 优先读取了相同签名但是有缺陷的类

10:40:02.675 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@62628e78
10:40:02.772 [NettyClientWorker-6-1] WARN io.netty.channel.ChannelInitializer - Failed to initialize a channel. Closing: [id: 0x961f8cf7]
java.lang.NoSuchMethodError: io.netty.handler.codec.http2.Http2FrameCodecBuilder: method <init>()V not found
     at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.<init>(TripleHttp2FrameCodecBuilder.java:32)
     at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.fromConnection(TripleHttp2FrameCodecBuilder.java:37)
     at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.forClient(TripleHttp2FrameCodecBuilder.java:45)

2、 换成3.1.8 版本之后, consumer 和 provider 都正常启动了,但是consumer这边拿不到provider的结果,观察日志发现,provider接收到了consumer的请求,并进行了日志的打印,但是没有走到最后的invoker进行调用,追踪代码调用发现,调用过程中会有一个异常抛出 【找不到类 com.google.protobuf.Message】这个像是依赖进行下载的时候没下载到,而且日志里面没有打印出这个异常,后续加上相关的依赖后,可以拿到provider的相关结果。

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>4.0.0-rc-2</version>
</dependency>

五、triple底层调用分析

1、入口类 TripleProtocol

2、服务暴露的相关流程

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    pathResolver.add(url.getServiceKey(), invoker);
    pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);
    PortUnificationExchanger.bind(url, new DefaultPuHandler());
}
  • pathResolver 收集暴露的服务,后续进行使用
  • PortUnificationExchanger 开启端口进行监听 (和 dubbo协议类似)
org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer#doOpen() {
   bootstrap.group(bossGroup, workerGroup)
    .channel(NettyEventLoopFactory.serverSocketChannelClass())
    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // Do not add idle state handler here, because it should be added in the protocol handler.
            final ChannelPipeline p = ch.pipeline();
            final NettyPortUnificationServerHandler puHandler;
            puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
                NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,
                getSupportedUrls(), getSupportedHandlers());
            p.addLast("negotiation-protocol", puHandler);
        }
    }); 
}

这里依然使用netty进行请求处理后续大量的异步都是依赖 NettyPortUnificationServerHandler 这个类展开的。

3、provider处理请求的相应流程

1、 channelHandler配置

到达TripleHttp2FrameServerHandler之后,就开始准备进行service的调用了,trip使用的是 grpc 和 http2的协议 传过来的报文 分为两种类型 请求头,和请求体,所以这里相应的有两部部分的处理逻辑到这一步还是使用的 netty的 handler链是由netty进行的调用

2、请求头的报文数据

3、请求体的报文数据

4、调用实现

拿到请求请求头和请求体后,基本就可进行实际的方法调用了,这边在拿到请求头和请求体时会进行不同的操作,而且都是以异步的方式进行了,不在是netty链路上【有相关的配置应该可以指定使用哪部分线程池】

a、拿到请求头的操作
 public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
    TripleServerStream tripleServerStream = new TripleServerStream(ctx.channel(),
        frameworkModel, executor,
        pathResolver, acceptEncoding, filters);
    ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);
    tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream());
}

@Override
public void onHeader(Http2Headers headers, boolean endStream) {
    executor.execute(() -> processHeader(headers, endStream));
}

private void processHeader(Http2Headers headers, boolean endStream) {
....
  Invoker<?> invoker = getInvoker(headers, serviceName);
  ServerStream.Listener  listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,
        frameworkModel, acceptEncoding, serviceName, originalMethodName, filters,
        executor);

   // must before onHeader
   Deframer deframer = new TriDecoder(deCompressor, new ServerDecoderListener(listener));
   listener.onHeader(requestMetadata);
}

protected void startCall() {
    RpcInvocation invocation = buildInvocation(methodDescriptor);
    ServerCall.Listener listener = startInternalCall(invocation, methodDescriptor, invoker);
}

protected ServerCall.Listener startInternalCall() {
  switch (methodDescriptor.getRpcType()) {
    case UNARY:
        listener = new UnaryServerCallListener(invocation, invoker, responseObserver);
        request(2);
        break;
    case SERVER_STREAM:
        listener = new ServerStreamServerCallListener(invocation, invoker,
            responseObserver);
        request(2);
        break;
    case BI_STREAM:
    case CLIENT_STREAM:
        listener = new BiStreamServerCallListener(invocation, invoker,
            responseObserver);
        request(1);
        break;
    default:
        throw new IllegalStateException("Can not reach here");
}
return listener;  
}

上面的一系列操作主要是进行服务调用的前的数据准备以及相关的类封装

异步调用 processHeader 方法主要完成 以下几件事
  • getInvoker(headers, serviceName) 找到实际要调用的invoker
  • ReflectionAbstractServerCall 这个实际上实现了两个接口 ServerStream.Listener ,ServerCall 这里是为了 构造 ServerDecoderListener 所以使用 ServerStream.Listener 进行承接
  •  创建 Deframer的实现类 TriDecoder

明确一下:

   ServerStream.Listener listener

    Deframer deframer

     这两个类实例都在 TripleServerStream 的实例里面 这里是准备好了后续调用所需的类数据了

ServerStream.Listener.onHeader方法调用

这个方法调用主要是为了生成 ServerCall.Listener 这个生成的实例是属于 ReflectionAbstractServerCall 这个类实例的

所以 进过上面方法的调用 类的从属关系如下:

b、 拿到请求体操作
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler#onDataRead
  org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#onData
   org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#doOnData
     org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deframe
       org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deliver
         org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#processBody
           org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#onRawMessage
              org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerStream.Listener#onRawMessage
                org.apache.dubbo.rpc.protocol.tri.stream.Stream.Listener#onMessage
                  org.apache.dubbo.rpc.protocol.tri.call.UnaryServerCallListener#onMessage
                     org.apache.dubbo.rpc.RpcInvocation#setArguments
         org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#close
            org.apache.dubbo.rpc.protocol.tri.stream.ServerStream.Listener#onComplete
              org.apache.dubbo.rpc.protocol.tri.call.ServerCall.Listener#onComplete
               org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#invoke
                 org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#onReturn  返回数据

总之数据就是一层一层往进传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/706623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3 父子组件传值 记录

最近这个组件之间传值用的较多&#xff0c;我这该死的记性&#xff0c;总给忘记写法&#xff0c;特此记录下 第一种 父传子 补充&#xff1a;LeftView.vue 是父组件&#xff1b; Video.vue 是子组件 第二种 子传父 Video.vue 子组件 第一步 引入&#xff1a; import { de…

Linux搭建Discuz论坛

环境&#xff1a;redhat 9 mysql 8 Discuz 3.5 题目要求&#xff1a;在 bbs.example.com 主机上创建 Discuz 论坛&#xff0c;数据库服务器使用 db.example.com 主机的 bbs 数据库实例&#xff0c;该实例由 MySQL数据库软件提供服务。 题目要求没有说是在一台虚拟机…

PostgreSQL学习笔记

目录 一、PostgreSQL安装 1、下载 2、安装 二、PostgreSQL操作 1、数据库操作 2、表操作 3、数据操作 一、PostgreSQL安装 本章节以windows系统安装为例&#xff0c;讲解PostgreSQL 15.0的安装过程。 1、下载 访问PostgreSQL官方网站&#xff0c;下载对应的安装包&am…

Qt/C++编写超精美自定义控件(历时9年更新迭代/超202个控件/祖传原创)

一、前言 无论是哪一门开发框架&#xff0c;如果涉及到UI这块&#xff0c;肯定需要用到自定义控件&#xff0c;越复杂功能越多的项目&#xff0c;自定义控件的数量就越多&#xff0c;最开始的时候可能每个自定义控件都针对特定的应用场景&#xff0c;甚至里面带了特定的场景的…

多元回归预测 | Matlab基于麻雀算法(SSA)优化混合核极限学习机HKELM回归预测, SSA-HKELM数据回归预测,多变量输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元回归预测 | Matlab基于麻雀算法(SSA)优化混合核极限学习机HKELM回归预测, SSA-HKELM数据回归预测,多变量输入模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 …

Idea中使用Git详细教学

目录 一、配置 Git 二、创建项目远程仓库 三、初始化本地仓库 方法一&#xff1a; 方法二&#xff1a; 四、连接远程仓库 五、提交与拉取到本地仓库 六、推送到远程仓库 七、克隆远程仓库到本地 方法一&#xff1a; 方法二&#xff1a; 八、Git分支操作 一、配置 G…

GAMES101笔记 Lecture07 Shading1(Illumination, Shading and Graphics Pipeline)

目录 Visibility / Occlusion(可见性 or 遮挡)Painters Algorithm(画家算法)Z-Buffer(深度缓冲算法) Shading(着色)A Simple Shading Model(Blinn-Phong Reflectance Model)一个简单的着色模型&#xff1a;Blinn-Phong反射模型Diffuse Reflection(漫反射) 参考资源 Visibility …

Learn Mongodb了解DB数据库 ①

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; PHP MYSQL &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

flash attention论文及源码学习

​ 论文 attention计算公式如下 传统实现需要将S和P都存到HBM&#xff0c;需要占用 O ( N 2 ) O(N^{2}) O(N2)内存&#xff0c;计算流程为 因此前向HBM访存为 O ( N d N 2 ) O(Nd N^2) O(NdN2)&#xff0c;通常N远大于d&#xff0c;GPT2中N1024&#xff0c;d64。HBM带宽…

#10043. 「一本通 2.2 例 1」剪花布条(内附封面)

题目描述 原题来自&#xff1a;HDU 2087 一块花布条&#xff0c;里面有些图案&#xff0c;另有一块直接可用的小饰条&#xff0c;里面也有一些图案。对于给定的花布条和小饰条&#xff0c;计算一下能从花布条中尽可能剪出几块小饰条来呢&#xff1f; 输入格式 输入数据为多…

23年6月1日软著又面临改革,个人加分评职称和企业申报项目加分的软件著作权登记证书该如何申请?

23年6月1号&#xff0c;国家版权局对软件著作权的申请又做出了改革&#xff0c;本次改革的主要内容是全面普及线上办公。申请人无需向中心递交或邮寄登记申请纸介质材料&#xff0c;“足不出户”即可完成版权登记。 软件著作权登记实现无纸化后&#xff0c;申请人在线登记办理…

1.2g可视化大屏项目分享【包含数字孪生、视频监控、智慧城市、智慧交通等】

1.2g可视化大屏项目分享【包含数字孪生、视频监控、智慧城市、智慧交通等】 链接&#xff1a;https://pan.baidu.com/s/1KSNll7b6bVoVPPqcQmNKeQ 提取码&#xff1a;w13x

Android 图形系统-图解和初步探究

Android 图形系统-图解和初步探究_猎羽的博客-CSDN博客https://blog.csdn.net/feather_wch/article/details/131486729 Android图形系统 2023-7-1 问题&#xff1a;如何将一帧画面显示到屏幕上&#xff1f; 绘制流程 Activity代码 Window的结构 绘制流程 Activity启动后&a…

JDK 动态代理为什么只能代理有接口的类?

嗯&#xff0c;这个问题的核心本质&#xff0c;是 JDK 动态代理本身的机制来决定的。 首先&#xff0c;在 Java 里面&#xff0c;动态代理是通过 Proxy.newProxyInstance()方法来实现的&#xff0c;它需 要传入被动态代理的接口类。 之所以要传入接口&#xff0c;不能传入类&a…

MYSQL增删改语句

INSERT 语法: 单行插入 INSERT INTO table_name (column_1, column_2, ...) VALUES (value_1, value_2, ...); 多行插入 INSERT INTO table_name (column_1, column_2, ...) VALUES (value_11, value_12, ...),(value_21, value_22, ...)...; INSERT INTO 和 VALUES都是关键词 …

libGL.so.1: cannot open shared object file: No such file or directory

不适用docker环境&#xff0c;在conda虚拟环境中出现如下错误&#xff1a; 解决办法&#xff1a; 参考资料 【解决方法】libGL.so.1: cannot open shared object file: No such file or directory

高性能分布式缓存Redis(三) 扩展应用

一、分布式锁 在并发编程中&#xff0c;通过锁&#xff0c;来避免由于竞争而造成的数据不一致问题 1.1、高并发下单超卖问题 Autowired RedisTemplate<String, String> redisTemplate;String key "maotai20210319001";//茅台商品编号ScheduledExecutorServ…

【线程池】史上最全的ScheduledThreadPoolExecutor源码分析

目录 一、简介 1.1 继承关系 1.2 使用 1.3 例子 二、源码分析 2.1 构造方法 2.2 主要的四种提交执行任务的方法 2.3 内部类 ScheduledFutureTask 2.3 scheduleAtFixedRate()方法 2.4 delayedExecute()方法 2.5 ScheduledFutureTask类的run()方法 2.6 内部类 Delaye…

Java设计模式(九)—— 工厂模式1

系列文章目录 披萨订购—传统方式 文章目录 系列文章目录前言一、传统方式案例1.具体需求案例2.传统方式实现3.传统方式优缺点 总结 前言 Hello&#xff0c;小伙伴们&#xff0c;欢迎来到柚子的博客~让我们一起成长吧o(&#xffe3;▽&#xffe3;)ブ 提示&#xff1a;以下是…

Java框架学习(一)JavaWeb基础:Maven、Spring、Tomcat、Mybatis、Springboot

文章目录 MavenMaven仓库Maven坐标为什么Maven进行了依赖管理&#xff0c;依然会出现依赖冲突&#xff1f;处理依赖冲突的手段是什么&#xff1f;详细讲讲scope依赖范围Maven的生命周期Maven高级分模块设计继承版本锁定聚合Maven的继承与聚合的异同私服 Tomcatservlet 分层解耦…