一、maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.90.Final</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>3.1.8</version>
<!-- <version>3.2.2</version>-->
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.0.0-rc-2</version>
</dependency>
二、demo代码
public interface TestTripleService {
String sayHello(String name);
}
public class TestTripleServiceImpl implements TestTripleService{
@Override
public String sayHello(String name) {
System.out.println(Thread.currentThread().getName() + " call :" + name);
return "hello :" + name;
}
}
public class TripleConsumer {
public static void main(String[] args) throws IOException {
System.setProperty("dubbo.application.logger", "slf4j");
ReferenceConfig<TestTripleService> ref = new ReferenceConfig<>();
ref.setInterface(TestTripleService.class);
//ref.setCheck(false);
ref.setProtocol(CommonConstants.TRIPLE);
// ref.setLazy(true);
ref.setTimeout(100000);
ref.setApplication(new ApplicationConfig("triple-consumer"));
ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));
final TestTripleService tripleService = ref.get();
System.out.println("dubbo ref started");
String result = tripleService.sayHello("123");
System.out.println(Thread.currentThread().getName() + " result :" + result);
}
}
public class TripleProvider {
public static void main(String[] args) throws InterruptedException {
System.setProperty("dubbo.application.logger", "slf4j");
ServiceConfig<TestTripleService> service = new ServiceConfig<>();
service.setInterface(TestTripleService.class);
service.setRef(new TestTripleServiceImpl());
// 这里需要显示声明使用的协议为triple
service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
service.setApplication(new ApplicationConfig("triple-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
三、其他相关配置
1、 jdk使用1.8
2、zookeeper使用3.7.1 【适配java jar版本】
四、调试问题
1、 一开始使用dubbo3.2.2的版本 provider 可以 正常启动,但是consumer启动报缺少一个类,发现的是dubbo自已的maven依赖冲突了, 优先读取了相同签名但是有缺陷的类
10:40:02.675 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@62628e78
10:40:02.772 [NettyClientWorker-6-1] WARN io.netty.channel.ChannelInitializer - Failed to initialize a channel. Closing: [id: 0x961f8cf7]
java.lang.NoSuchMethodError: io.netty.handler.codec.http2.Http2FrameCodecBuilder: method <init>()V not found
at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.<init>(TripleHttp2FrameCodecBuilder.java:32)
at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.fromConnection(TripleHttp2FrameCodecBuilder.java:37)
at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.forClient(TripleHttp2FrameCodecBuilder.java:45)
2、 换成3.1.8 版本之后, consumer 和 provider 都正常启动了,但是consumer这边拿不到provider的结果,观察日志发现,provider接收到了consumer的请求,并进行了日志的打印,但是没有走到最后的invoker进行调用,追踪代码调用发现,调用过程中会有一个异常抛出 【找不到类 com.google.protobuf.Message】这个像是依赖进行下载的时候没下载到,而且日志里面没有打印出这个异常,后续加上相关的依赖后,可以拿到provider的相关结果。
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.0.0-rc-2</version>
</dependency>
五、triple底层调用分析
1、入口类 TripleProtocol
2、服务暴露的相关流程
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
pathResolver.add(url.getServiceKey(), invoker);
pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);
PortUnificationExchanger.bind(url, new DefaultPuHandler());
}
- pathResolver 收集暴露的服务,后续进行使用
- PortUnificationExchanger 开启端口进行监听 (和 dubbo协议类似)
org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer#doOpen() {
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Do not add idle state handler here, because it should be added in the protocol handler.
final ChannelPipeline p = ch.pipeline();
final NettyPortUnificationServerHandler puHandler;
puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,
getSupportedUrls(), getSupportedHandlers());
p.addLast("negotiation-protocol", puHandler);
}
});
}
这里依然使用netty进行请求处理后续大量的异步都是依赖 NettyPortUnificationServerHandler 这个类展开的。
3、provider处理请求的相应流程
1、 channelHandler配置
到达TripleHttp2FrameServerHandler之后,就开始准备进行service的调用了,trip使用的是 grpc 和 http2的协议 传过来的报文 分为两种类型 请求头,和请求体,所以这里相应的有两部部分的处理逻辑到这一步还是使用的 netty的 handler链是由netty进行的调用
2、请求头的报文数据
3、请求体的报文数据
4、调用实现
拿到请求请求头和请求体后,基本就可进行实际的方法调用了,这边在拿到请求头和请求体时会进行不同的操作,而且都是以异步的方式进行了,不在是netty链路上【有相关的配置应该可以指定使用哪部分线程池】
a、拿到请求头的操作
public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {
TripleServerStream tripleServerStream = new TripleServerStream(ctx.channel(),
frameworkModel, executor,
pathResolver, acceptEncoding, filters);
ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);
tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream());
}
@Override
public void onHeader(Http2Headers headers, boolean endStream) {
executor.execute(() -> processHeader(headers, endStream));
}
private void processHeader(Http2Headers headers, boolean endStream) {
....
Invoker<?> invoker = getInvoker(headers, serviceName);
ServerStream.Listener listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,
frameworkModel, acceptEncoding, serviceName, originalMethodName, filters,
executor);
// must before onHeader
Deframer deframer = new TriDecoder(deCompressor, new ServerDecoderListener(listener));
listener.onHeader(requestMetadata);
}
protected void startCall() {
RpcInvocation invocation = buildInvocation(methodDescriptor);
ServerCall.Listener listener = startInternalCall(invocation, methodDescriptor, invoker);
}
protected ServerCall.Listener startInternalCall() {
switch (methodDescriptor.getRpcType()) {
case UNARY:
listener = new UnaryServerCallListener(invocation, invoker, responseObserver);
request(2);
break;
case SERVER_STREAM:
listener = new ServerStreamServerCallListener(invocation, invoker,
responseObserver);
request(2);
break;
case BI_STREAM:
case CLIENT_STREAM:
listener = new BiStreamServerCallListener(invocation, invoker,
responseObserver);
request(1);
break;
default:
throw new IllegalStateException("Can not reach here");
}
return listener;
}
上面的一系列操作主要是进行服务调用的前的数据准备以及相关的类封装
异步调用 processHeader 方法主要完成 以下几件事
- getInvoker(headers, serviceName) 找到实际要调用的invoker
- ReflectionAbstractServerCall 这个实际上实现了两个接口 ServerStream.Listener ,ServerCall 这里是为了 构造 ServerDecoderListener 所以使用 ServerStream.Listener 进行承接
- 创建 Deframer的实现类 TriDecoder
明确一下:
ServerStream.Listener listener
Deframer deframer
这两个类实例都在 TripleServerStream 的实例里面 这里是准备好了后续调用所需的类数据了
ServerStream.Listener.onHeader方法调用
这个方法调用主要是为了生成 ServerCall.Listener 这个生成的实例是属于 ReflectionAbstractServerCall 这个类实例的
所以 进过上面方法的调用 类的从属关系如下:
b、 拿到请求体操作
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler#onDataRead
org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#onData
org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#doOnData
org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deframe
org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deliver
org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#processBody
org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#onRawMessage
org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerStream.Listener#onRawMessage
org.apache.dubbo.rpc.protocol.tri.stream.Stream.Listener#onMessage
org.apache.dubbo.rpc.protocol.tri.call.UnaryServerCallListener#onMessage
org.apache.dubbo.rpc.RpcInvocation#setArguments
org.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#close
org.apache.dubbo.rpc.protocol.tri.stream.ServerStream.Listener#onComplete
org.apache.dubbo.rpc.protocol.tri.call.ServerCall.Listener#onComplete
org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#invoke
org.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#onReturn 返回数据
总之数据就是一层一层往进传