一、背景
早期学习和使用Dubbo的时候(那时候Dubbo还没成为Apache顶级项目),写过一些源码解读,但随着Dubbo发生了翻天覆地的变化,那些文章早已过时,所以现在计划针对最新的Apache Dubbo源码来进行“阅读理解”,希望和大家一起再探Dubbo的实现。由于能力有限,如果文章有错误的地方,欢迎大家留言指正。
本期的主题是Dubbo如何使用Netty4构建RPC来通讯。
二、Server端视角
我们看看作为服务提供方,Apache Dubbo是如何使用Netty4的。
2.1 Netty的线程组
线程组作为Netty的Reactor设计核心组件,在这里自然少不了,我们看到org.apache.dubbo.remoting.transport.netty4.NettyServer中有如下两个方法:
int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
String IO_THREADS_KEY = "iothreads";
String EVENT_LOOP_BOSS_POOL_NAME = "NettyServerBoss";
String EVENT_LOOP_WORKER_POOL_NAME = "NettyServerWorker";
protected EventLoopGroup createBossGroup() {
return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}
protected EventLoopGroup createWorkerGroup() {
return NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
EVENT_LOOP_WORKER_POOL_NAME);
}
分别用来构建Boss线程组和Worker线程组,Boss线程组负责创建连接,连接创建成功后由Worker线程组来负责处理和发送请求。注意Boss线程组只设置了1个线程,而且没有设置修改接口,通常1个也是够用的,如果设置成可配的会更好。而Worker线程组默认值是当前可用核数和32中取最小值,注意,是最小值,不是最大值,意味着如果你单个Provider实例上可用的核超过32,那么一定要设置IO_THREADS_KEY,否则可能无法达到最大吞吐量(特别是IO密集型应用)。上面代码也给出了这两个线程组的线程池的名称,如果不清楚现在跑的是多少,可以jstack命令看下。注意,为了阐述方便,上面的四个属性定义来自org.apache.dubbo.remoting.Constants。
2.1 Netty的ChannelHandler
Netty的可扩展性主要来自其核心组件之一ChannelHandler,ChannelHandler能帮助我们解决拆包&粘包、协议编解码、权限校验等RPC常遇到的问题。那我们来看看现在Dubbo的Provider端用到了哪些ChannelHandler,同样,我们看NettyServer#initServerBootstrap方法:
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap
.group(bossGroup, workerGroup)
// 根据是否支持EPoll来返回EpollServerSocketChannel或者NioServerSocketChannel
.channel(NettyEventLoopFactory.serverSocketChannelClass())
// SO_REUSEADDR设置为true表示允许重用本地地址和端口。如果服务器意外关闭后再次启动,可以立即绑定到之前使用的地址和端口。
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
// 表示禁用Nagle算法。Nagle算法会将小的网络包合并成较大的包来提高网络效率,但会增加数据传输的延迟。一般对延迟敏感的场景都会禁用Nagle算法。
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
// 开启TCP的心跳机制,用于检测连接是否还活着
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
// PooledByteBufAllocator是Netty提供的一种内存分配器实现,它可以重用ByteBuf对象的内存,提高内存的利用率和性能。
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int closeTimeout = UrlUtils.getCloseTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
// 处理HTTPS
ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
ch.pipeline()
// 解码,读数据的时候用
.addLast("decoder", adapter.getDecoder())
// 编码,写数据的时候用
.addLast("encoder", adapter.getEncoder())
// 注意,这是Netty提供的ChannelHandler
.addLast("server-idle-handler", new IdleStateHandler(0, 0, closeTimeout, MILLISECONDS))
// NettyServerHandler可是核心
.addLast("handler", nettyServerHandler);
}
});
}
我们重点关注下childHandler中添加的ChannelHandler。在这之前,我们先对Channel中的ChannelPipeline和ChanelHandler进行简单介绍,我们知道每个连接对应一个Channel,而每个Channel对应一个ChannelPipeline,用来组织和管理连接上面的处理器ChannelHandler(责任链的方式)。ChannelHandler分为ChannelInboundHandler和ChannelOutboundHandler两大类,分别管理入站(读)和出站(写)的流程定制,当然,为了使用方便,Netty提供了很多开箱即用的ChannelHandler,便于我们轻松实现HTTP、WebSocket等协议。
SslServerTlsHandler
这是上面通过childHandler添加的第一个ChannelHandler(它属于Netty中ByteToMessageDecoder的子类,ByteToMessageDecoder的主要用途是将接收到的字节数据解码为消息对象,并将解码后的消息对象传递给下一个处理器进行后续的业务处理。继承它可以方便的处理多种数据格式,例如二进制数据、文本数据等),看这Handler的名称就知道是做什么的,用来进行SSL|TLS加解密。在其实现的decode方法中,如果发现ByteBuf是支持SSL|TLS,那么会立马在当前Channel的ChannelPipeline中加入SslHandler(Netty自带的支持SSL|TLS握手的ChannelHandler),注意,是加载当前SslServerTlsHandler之后。为了能在SSL|TLS解码后能继续执行SslServerTlsHandler的逻辑,在SslHandler后面又新加了一个SslServerTlsHandler(其中sslDetected=true,说明数据包到这里已经解包完成),用于校验协议。
当然,如果SslServerTlsHandler发现你没有使用SSL|TLS(Netty中的SslHandler#isEncrypted能判断ByteBuf是否进行过加密),那么会直接把自己从ChannelPipeline中删除。这里给出SslServerTlsHandler#decode代码:
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
throws Exception {
// Will use the first five bytes to detect a protocol.
if (byteBuf.readableBytes() < 5) {
return;
}
// SSL|TLS 校验完成后会创建一个新的SslServerTlsHandler,其中sslDetected会设置成true
if (sslDetected) {
return;
}
CertManager certManager =
url.getOrDefaultFrameworkModel().getBeanFactory().getBean(CertManager.class);
ProviderCert providerConnectionConfig = certManager.getProviderConnectionConfig(
url, channelHandlerContext.channel().remoteAddress());
// 如果没启用SSL|TLS,那么获取到的providerConnectionConfig就会是null,从而删除并退出该Handler
if (providerConnectionConfig == null) {
ChannelPipeline p = channelHandlerContext.pipeline();
p.remove(this);
return;
}
if (isSsl(byteBuf)) {
SslContext sslContext = SslContexts.buildServerSslContext(providerConnectionConfig);
enableSsl(channelHandlerContext, sslContext);
return;
}
if (providerConnectionConfig.getAuthPolicy() == AuthPolicy.NONE) {
ChannelPipeline p = channelHandlerContext.pipeline();
p.remove(this);
return;
}
logger.error(INTERNAL_ERROR, "", "", "TLS negotiation failed when trying to accept new connection.");
channelHandlerContext.close();
}
NettyCodecAdapter.InternalDecoder
之前已经给了NettyServer#initServerBootstrap方法,其中可以发现这个内部解码器InternalDecoder是继SslServerTlsHandler的下一个ChannelHandler,由于它和SslServerTlsHandler一样,是ByteToMessageDecoder的子类,所以我们直接看其decode方法:
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// ChannelBuffer是Dubbo对通道缓冲区的抽象,这里的NettyBackedChannelBuffer就是其Netty缓冲区第一个实现,NettyBackedChannelBuffer内部直接使用的是Netty的ByteBuf
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// Dubbo中的NettyChannel是衔接Netty Channel和Dubbo内部通道抽象的桥梁,其中NettyChannel就有Dubbo的解码器
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
do {
int saveReaderIndex = message.readerIndex();
// 这里的codec默认是DubboCountCodec实例,支持MultiMessage解码,这里返回的msg已经是org.apache.dubbo.remoting.exchange.Request对象
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
// is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
}
}
1. DubboCountCodec(implements Codec2):用来处理MultiMessage类型的消息,Dubbo Consumer可以通过MultiMessage来一次性发送多个请求,但一般没人这么用,大家更愿意传递一个List来让下游Dubbo Provider来批量处理,上述InternalDecoder#decode方法内部调用了其decode方法,decode方法会继续触发其他org.apache.dubbo.remoting.Codec2实现类的调用。
2. DubboCodec(extends ExchangeCodec):DubboCountCodec#decode内部会调用其父类ExchangeCodec#decode方法,主要用于将Dubbo协议中的请求和响应消息进行序列化和反序列化。看到这里,难道Dubbo没用LengthFieldBasedFrameDecoder?这可是Netty中解决TCP拆包、粘包的神器!没有,Dubbo自己在ExchangeCodec#decode方法中自己解决了粘包、拆包问题,通过返回DecodeResult.NEED_MORE_INPUT来表示需要更多数据。ExchangeCodec#decode方法最后又将调用DubboCodec#decodeBody来解码body部分,这里给出Dubbo的协议图:
我们看看DubboCodec解码body部分的代码(注意DubboCodec可以解码也可以编码,这里为了简单起见,只展示解码部分):
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
// decode request.
Request req;
Object data;
req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// get data length.
int len = Bytes.bytes2int(header, 12);
req.setPayload(len);
DecodeableRpcInvocation inv;
// 如果你实现了自定义解析器,那么就使用自定义的
if (customByteAccessor != null) {
inv = customByteAccessor.getRpcInvocation(
channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
} else {
// 否则用Dubbo内置的
inv = new DecodeableRpcInvocation(
frameworkModel,
channel,
req,
new UnsafeByteArrayInputStream(readMessageData(is)),
proto);
}
// DecodeableRpcInvocation来通过协议对应的序列化器来反序列化
inv.decode();
data = inv;
req.setData(data);
return req;
}
可以看到,DubboCodec将使用DecodeableRpcInvocation#decode来完成最终解码过程。需要注意的是,我们可以通过设置DECODE_IN_IO_THREAD_KEY("decode.in.io.thread")参数来让Dubbo决定整个decode操作是否在IO线程中执行,默认是在IO线程执行。
2. DecodeableRpcInvocation(extends RpcInvocation):
前面我们说真正的反序列化是在该类中实现的,我们看下代码:
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
int contentLength = input.available();
this.put(Constants.CONTENT_LENGTH_KEY, contentLength);
// 通过请求中的SerializationId来选择对应的序列化对象进行数据的反序列化
ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);
this.put(SERIALIZATION_ID_KEY, serializationType);
// 设置Dubbo版本号
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(DUBBO_VERSION_KEY, dubboVersion);
// 设置需要调用的接口标识
String path = in.readUTF();
setAttachment(PATH_KEY, path);
String version = in.readUTF();
setAttachment(VERSION_KEY, version);
// Do provider-level payload checks.
String keyWithoutGroup = keyWithoutGroup(path, version);
// 为了防止数据过载,在Dubbo Consumer端可以设置payLoad,这里检查数据是否超过payLoad限制
checkPayload(keyWithoutGroup);
// 设置需要调用的方法标识
setMethodName(in.readUTF());
// 参数类型的全限定名,如果有多个参数,通过;分隔
String desc = in.readUTF();
setParameterTypesDesc(desc);
ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
try {
Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
if (desc.length() > 0) {
// 根据参数描述来拿到对应的Class对象
pts = drawPts(path, version, desc, pts);
// 根据参数的Class和值,解析到真正的参数对象
args = drawArgs(in, pts);
}
setParameterTypes(pts);
Map<String, Object> map = in.readAttachments();
if (CollectionUtils.isNotEmptyMap(map)) {
addObjectAttachments(map);
}
decodeArgument(channel, pts, args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
Thread.currentThread().setContextClassLoader(originClassLoader);
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
我们可以看到,先是通过CodecSupport.getSerialization来获取对应的序列化对象,然后通过它来反序列化数据(我们可以通过看org.apache.dubbo.common.serialize.Serialization有哪些实现类来了解Dubbo默认支持的协议类型)。
NettyServerHandler
当轮到NettyServerHandler执行的时候,由于前面的ChannelHandler处理,请求数据已经反序列化成了org.apache.dubbo.remoting.exchange.Request对象,到了NettyServerHandler#received内部,就轮到一些列 org.apache.dubbo.remoting.ChannelHandler 的子类来处理了:
MultiMessageHandler
如果是MultiMessage类型的消息,那么for循环去调用内部handler去处理,否则只调用handler处理一次。
HeartbeatHandler
在Channel上设置读的时间戳,便于后续判断该Channel是否存活,同时判断请求是不是HeartBeatRequest类型,如果是,则直接应答。否则继续执行下一个handler。
AllChannelHandler
根据请求的url来获取处理的线程池,然后将请求和DecodeHandler实例构造成ChannelEventRunnable对象来扔进线程池中执行。
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
// 这里的handler是DecodeHandler对象
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if (message instanceof Request && t instanceof RejectedExecutionException) {
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
DecodeHandler
前面说了AllChannelHandler之后的逻辑是在另一个线程中执行的,毫无疑问,在这个新的线程中,主要是用来调用DecodeHandler的来进行解码,在Dubbo Provider接收客户端请求的场景下,调用的是received方法:
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 如果之前设置的不是在IO线程中解码,那么就在这个线程中解码
if (message instanceof Decodeable) {
decode(message);
}
// 如果是Request对象,那么得对消息负载(真正的请求对象)进行解码
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// handler是HeaderExchangeHandler对象
handler.received(channel, message);
}
HeaderExchangeHandler
该Handler根据请求的交互方式来决定如何处理请求(例如有的交互方式是不需要应答的),处理真正调用,比如会拿Request.data(RpcInvocation),最终会去调用DubboProtocol#ExchangeHandler的reply方法来拿到一个异步结果对象,并设置完成时候res的状态码和结果的赋值:
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// find handler by message class.
Object msg = req.getData();
try {
// handler是DubboProtocol#ExchangeHandler对象
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn(
TRANSPORT_FAILED_RESPONSE,
"",
"",
"Send result to consumer failed, channel is " + channel + ", msg is " + e);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}