前言
Dubbo 框架采用分层设计,最底下的 Serialize 层负责把对象序列化为字节序列,再经过 Transport 层网络传输到对端。一次 RPC 调用,在 Dubbo 看来其实就是一段请求报文和一段响应报文的传输过程。
理解Transport
Transport 层即网络传输层,它在 Serialize 的上层,Exchange 的下层,起到一个承上启下的作用。
有很多网络库可以做网络传输,比如 Netty、Mina、甚至是 JDK 原生的 Socket,但是这些库的使用方式和对外接口都不一样,如果直接依赖三方库开发,后续更换实现方案就非常麻烦了,违背了开闭原则。
所以 Transport 层对网络传输做了抽象,它把 Netty、Mina 封装成统一接口,上层面向接口编程,具体实现可以轻松替换。
Transport 层只负责数据的传输,至于要传输什么数据它是不关心的,也不应该关心。传输的数据格式和通信协议息息相关,应该由协议层去定义。
设计实现
Transport 层和 Exchange 层的代码都位于dubbo-remoting
模块,同样的,dubbo-remoting-api
模块只定义抽象接口,其它子模块负责具体实现。
Dubbo 官方内置的 remoting 模块:
<modules>
<module>dubbo-remoting-api</module>
<module>dubbo-remoting-netty</module>
<module>dubbo-remoting-mina</module>
<module>dubbo-remoting-grizzly</module>
<module>dubbo-remoting-p2p</module>
<module>dubbo-remoting-http</module>
<module>dubbo-remoting-zookeeper</module>
<module>dubbo-remoting-netty4</module>
<module>dubbo-remoting-etcd3</module>
</modules>
抽象层
Transport 层的核心SPI接口是 Transporter,默认实现是 Netty。
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
接口定义了两个方法,bind()
用于服务端绑定本地接口,connect()
用于客户端和服务器建立连接。
ChannelHandler 接口用来定义 Channel 事件,Dubbo 目前定义了五个事件,分别是:
- connected:连接事件
- disconnected:连接断开事件
- sent:数据发送事件
- received:数据接收事件
- caught:异常事件
@SPI
public interface ChannelHandler {
void connected(Channel channel) throws RemotingException;
void disconnected(Channel channel) throws RemotingException;
void sent(Channel channel, Object message) throws RemotingException;
void received(Channel channel, Object message) throws RemotingException;
void caught(Channel channel, Throwable exception) throws RemotingException;
}
Channel 接口抽象的是一个tcp连接,它继承自 Endpoint,代表它也是一个端点。既然是连接,那自然就拥有发送数据、主动断开、读写属性等能力。
public interface Channel extends Endpoint {
// 远程地址
InetSocketAddress getRemoteAddress();
// 是否连接
boolean isConnected();
/****属性的读写****/
boolean hasAttribute(String key);
Object getAttribute(String key);
void setAttribute(String key, Object value);
void removeAttribute(String key);
}
public interface Endpoint {
URL getUrl();
// Channel事件处理器
ChannelHandler getChannelHandler();
// 本地地址
InetSocketAddress getLocalAddress();
// 发送数据
void send(Object message) throws RemotingException;
void send(Object message, boolean sent) throws RemotingException;
// 关闭连接
void close();
void close(int timeout);
void startClose();
boolean isClosed();
}
RemotingServer 抽象的是服务器接口,通过绑定本地接口可以获得一个服务器对象,它会维护所有和它建立连接的 Channel。
public interface RemotingServer extends Endpoint, Resetable, IdleSensible {
// 是否绑定
boolean isBound();
// 拿所有连接
Collection<Channel> getChannels();
// 根据远程地址拿连接
Channel getChannel(InetSocketAddress remoteAddress);
@Deprecated
void reset(org.apache.dubbo.common.Parameters parameters);
}
Client 接口抽象的是客户端,它继承自 Channel,所以它也是一个连接,可以向远程发送数据。
public interface Client extends Endpoint, Channel, Resetable, IdleSensible {
// 重连
void reconnect() throws RemotingException;
@Deprecated
void reset(org.apache.dubbo.common.Parameters parameters);
}
除了围绕 Transporter 的这些接口,传输层还有一个很重要的接口。
Codec2 是网络编解码器的抽象接口,我们代码里发送的是 Object,对象本身不能通过网络传输,得经过编码器把它编码为字节序列才能发送。同样的,对端收到的也是一段字节序列,得经过解码器按照相同的规则解码为 Object。
@SPI
public interface Codec2 {
// 编码
@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;
// 解码
@Adaptive({Constants.CODEC_KEY})
Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
enum DecodeResult {
NEED_MORE_INPUT,
SKIP_SOME_INPUT
}
}
实现层
直接看默认实现,基于 Netty4 的 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
}
NettyServer
基于 Netty 实现的服务器类是 NettyServer,继承关系比较复杂,看看每一层构造函数都干了啥。
AbstractPeer 主要是保存 URL 和 ChannelHandler
public AbstractPeer(URL url, ChannelHandler handler) {
this.url = url;
this.handler = handler;
}
AbstractEndpoint 通过 SPI 加载编解码器 Codec2 的实现
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
AbstractServer 构造函数是个模板方法,调用子类的doOpen
开启服务,然后创建线程池
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取绑定的IP和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 最大连接数、连接空闲超时时间
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen(); // 开启服务
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 业务线程池
executor = executorRepository.createExecutorIfAbsent(url);
}
NettyServer 是具体实现,主要是基于 Netty 开启服务,常规的 ServerBootstrap 启动流程,我们重点关注的是 Dubbo 对 ChannelPipeline 的编排。
- 如果开启 ssl,会插入一个 SslServerTlsHandler
- 接着插入编解码器
- IdleStateHandler 用于关闭超时闲置的连接
- NettyServerHandler 主要是对我们的业务处理器 handler 再包装了一层
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// Accept线程
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
// IO线程 CPU核心数+1 最大不会超过32
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
// 开启ssl
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
// 编排ChannelPipeline
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
/**
* @see HeaderExchanger#bind(URL, ExchangeHandler)
*/
.addLast("handler", nettyServerHandler);
}
});
// 绑定端口,同步等待完成
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
NettyClient
基于 Netty 实现的客户端类是 org.apache.dubbo.remoting.transport.netty4.NettyClient。
基类是一样的,用于初始化编解码器 Codec2。AbstractClient 会调用子类doOpen
开启客户端,紧接着调用connect
和服务器建立连接。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
// 初始化线程池
initExecutor(url);
doOpen();// 开启客户端
connect(); // 建立连接
}
doOpen
也是常规的 Netty Bootstrap 启动流程,ChannelPipeline 的编排和 Server 端一致。
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(NIO_EVENT_LOOP_GROUP)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(socketChannelClass());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
}
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
if(socksProxyHost != null) {
int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
}
});
}
doConnect
会调用Bootstrap#connect
和服务端建立连接,连接成功后会得到一个 Channel 对象,Dubbo 可以通过它给服务端发数据。
@Override
protected void doConnect() throws Throwable {
ChannelFuture future = bootstrap.connect(getConnectAddress());
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
NettyClient.this.channel = newChannel;
}
}