在Seata中,TM,RM与TC都需要进行跨进程的网络调用,通常来说就会需要RPC来支持远程调用,而Seata内部就有自身基于Netty的RPC实现,这里我们就来看下Seata是如何进行RPC设计与实现的
RPC整体设计
抽象基类AbstractNettyRemoting
该类是整个RPC设计的一个顶层抽象类,其主要实现了同步发送与异步发送的功能,此时在这里还没区分客户端与服务端,因为无论是客户端还是服务端,双方都应该有发送数据的功能
(1)同步请求sendSync
/**
* 发送一个同步的rpc请求
*
* @param channel netty的channel对象
* @param rpcMessage rpc的消息载体对象
* @param timeoutMillis rpc超时时间
* @return 响应的数据对象
* @throws TimeoutException 请求超时异常
*/
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
// 创建一个MessageFuture同步发送辅助对象,并放到futures集合中
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// 检查该channel是否可写(channel中有写缓冲区,如果写缓冲区达到了一定的水位此时的状态就是不可写)
channelWritableCheck(channel, rpcMessage.getBody());
// 获取要发送的目标ip地址
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
// 执行rpc发送前的钩子方法
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 发送数据
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 条件成立:说明发送失败
if (!future.isSuccess()) {
// 把这个MessageFuture从futures集合中清除
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
// 设置发送异常类型
messageFuture1.setResultMessage(future.cause());
}
// 销毁channel
destroyChannel(future.channel());
}
});
try {
// 由于netty是异步发送的,所以为了达到同步发送的效果,在这里会进行阻塞,当接收端发送响应之后才会解除阻塞,这是一种常用的异步转同步的方式
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 执行rpc发送后的钩子方法
doAfterRpcHooks(remoteAddr, rpcMessage, result);
// 返回响应的数据对象
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
rpcMessage.getBody());
// 等待响应超时,请求已经发出去了
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
}
// 发送请求异常,请求还没发出去
else {
throw new RuntimeException(exx);
}
}
}
- 构建MessageFuture对象,RpcMessage的id作为key,MessageFuture作为value放到futures这个map中
- 检查下发送的目标channel是否是可写的
- 执行钩子的发送前置方法
- 发送数据,如果发送失败就把RpcMessage从futures中进行移除,并且销毁这个channel
- 通过MessageFuture的get方法进行阻塞,同步获取响应结果
这里着重关注一下MessageFuture,通常来说如果我们基于Netty去实现RPC的话,由于Netty是异步发送请求的,所以单纯通过Netty我们是不能做到同步请求的效果的,而一般异步转同步的解决方案就是使用Java的CountdownLatch(RocketMQ是以CountdownLatch实现的)或者CompleteFuture来进行同步阻塞。比如使用CompleteFuture的话具体的实现流程就是创建一个CompleteFuture对象,然后把这个对象先缓存到map中(key为请求id,value为这个CompleteFuture对象),接着调用netty的发送数据,调用完之后通过CompleteFuture的get方法进行同步阻塞,阻塞到什么时候呢?当接收到对方的响应请求的时候,会根据响应请求的请求id(响应请求id与发送请求id是相同的)去从map中找到对应的CompleteFuture对象,然后把响应结果作为参数去调用CompleteFuture对象的complete方法,根据CompleteFuture的特性,此时同步阻塞的get方法就会解除阻塞并获取到上面传入的响应结果。正好Seata实现异步转同步就是使用CompleteFuture的方式,每一个MessageFuture对象中就包装了一个CompleteFuture,具体代码如下:
/**
* 通过该类的辅助去实现Netty发送数据的同步效果
* 具体方案:在每次往接收端发送数据之前,创建一个新的MessageFuture对象然后放到一个集合中,该集合的key是该数据的唯一id,value是这个MessageFuture对象,
* 然后当调用完netty的发送数据api之后就可以利用MessageFuture中的CompletableFuture进行阻塞,阻塞到什么时候呢?
* 当接收端那边接收到发送过来的数据之后,就发送一个对应的响应请求过来,这个响应请求会带着发送的那个数据的唯一id,只要收到这个响应请求之后,根据唯一id去集合中
* 寻找到对应的MessageFuture对象,然后就可以让里面的CompletableFuture对象解除阻塞,这样整个过程就实现了同步发送的效果了。该方案也是异步转同步的典型方案
*
* @author slievrly
*/
public class MessageFuture {
/**
* 同步发送的数据载体对象
*/
private RpcMessage requestMessage;
/**
* 同步发送的超时时间
*/
private long timeout;
private long start = System.currentTimeMillis();
private transient CompletableFuture<Object> origin = new CompletableFuture<>();
/**
* Is timeout boolean.
*
* @return the boolean
*/
public boolean isTimeout() {
return System.currentTimeMillis() - start > timeout;
}
/**
* 同步等待获取结果值
*
* @param timeout 等待超时时间
* @param unit 时间单位
* @return 获取到的结果值
* @throws TimeoutException 超时异常
* @throws InterruptedException 中断异常
*/
public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
Object result = null;
try {
result = origin.get(timeout, unit);
if (result instanceof TimeoutException) {
throw (TimeoutException)result;
}
} catch (ExecutionException e) {
throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
} catch (TimeoutException e) {
throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
}
if (result instanceof RuntimeException) {
throw (RuntimeException)result;
} else if (result instanceof Throwable) {
throw new RuntimeException((Throwable)result);
}
return result;
}
/**
* 设置结果值,有可能是个超时异常 {@link TimeoutException}
*
* @param obj 结果值
*/
public void setResultMessage(Object obj) {
origin.complete(obj);
}
}
MessageFuture的get方法和setResultMessage方法就是分别对CompleteFuture的get方法和complete方法的封装
(2)异步请求sendAsync
/**
* 发送一个异步RPC请求
*
* @param channel channel
* @param rpcMessage rpc请求载体对象
*/
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
channelWritableCheck(channel, rpcMessage.getBody());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
}
doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
destroyChannel(future.channel());
}
});
}
异步发送就比较简单了,因为Netty本身就是异步发送的,所以这里就不用再做额外的处理了
(3)删除过期的MessageFuture
public void init() {
// 开启一个定时任务,每3s去检查一下所有正在同步发送的MessageFuture,如果发送有超时的,则及时从futures集合中清除
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
上面我们知道了同步请求的实现原理,对于超时的同步请求的MessageFuture如果不进行清理,那么就会一直存在于futures这个map中,在AbstractNettyRemoting中提供了一个init方法,在init方法中会去通过一个定时器每隔3s去map中进行移除掉过期的MessageFuture
(4)处理数据接收
无论是客户端还是服务端,都需要对接收到的数据进行处理,所以在这一层中也对接收数据进行了统一的定义处理,代码如下:
/**
* 接收rpc请求
*
* @param ctx ChannelHandlerContext对象
* @param rpcMessage 接收的消息载体对象
* @throws Exception throws exception process message error.
* @since 1.3.0
*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
// 获取消息内容
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 根据消息类型去获取到对应的处理器进行处理
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
// 交给线程池去处理
pair.getSecond().execute(() -> {
try {
// 处理器处理接收到的消息
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
当接收到数据之后,会根据数据的类型去从processorTable这个map中获取到对应的pair,processorTable又是个什么东西呢,里面存的pair又是干什么的?processorTable中的key是数据的类型,key是pair,pair里面存了两个对象,一个是RemotingProcessor,另一个是ExecutorService(也就是线程池),因为接收到的消息是需要去走不同的业务处理逻辑的,比如RM接收到TC发送过来的commit消息就走commit的处理逻辑,接收到rollback消息就走rollback的处理逻辑,所以在Seata中为了达到这样的效果,就需要对接收的数据进行类型区分,然后每一种类型的消息都会对应一个处理器RemotingProcessor,而处理器则交由线程池来执行。那么processorTable的key-value又是哪里来的呢?答案就是交由客户端与服务端根据自身的业务需求来具体往processorTable中注册处理器
客户端(TM/RM)
AbstractNettyRemotingClient
该类继承于AbstractNettyRemoting,是Netty客户端的抽象基类,基于AbstractNettyRemoting的发送数据能力之上,实现了通过不同的负载均衡策略去选择服务端地址以及批量发送的功能
(1)服务端负载均衡
protected String loadBalance(String transactionServiceGroup, Object msg) {
InetSocketAddress address = null;
try {
// 获取到transactionServiceGroup对应的TC集群的地址集合
@SuppressWarnings("unchecked")
List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
// 通过负载均衡算法(默认是XIDLoadBalance)选择出一个TC的地址
address = this.doSelect(inetSocketAddressList, msg);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
if (address == null) {
throw new FrameworkException(NoAvailableService);
}
return NetUtil.toStringAddress(address);
}
loadBalance方法主要做的就是通过事务服务组名从注册工厂中去获取到所有的服务端地址,然后再根据具体的负载均衡算法去选择出一个服务端地址并返回,那么这里就会涉及到有两个点了,一个是注册工厂,一个是负载均衡算法,这两个都是有具体不同实现的。对于注册工厂来说,在Seata中提供了多种TC(在Seata中TC就是服务端)注册的方式,比如支持Nacos,Etcd,Eureka,Redis,Zookeeper等等,而负载均衡算法也一样,有轮询,随机等方式,既然有多种实现方式,Seata是怎么进行选择与适配的呢?这里很自然就会想到通过SPI机制,我们通过配置某一种方式然后Seata就可以根据我们的配置去进行选择具体的策略,而之后如果还要扩展其它方式的时候,对于Seata来说就很方便了,只需要对接SPI接口来实现即可。另外还有个问题就是loadBalance方法是每次在发送消息的时候就会去调用,是不是每次都需要去从注册工厂中实时获取到TC的地址呢?其实并没这个必要,因为TC的地址一般都不会频繁变动(TC的上下线也不会很多),所以Seata对于这一点做了些优化,就是通过一个定时任务每隔10s从注册工厂中获取到TC的地址然后建立channel连接并缓存起来,之后每次调用loadBalance去获取TC地址的时候从这个缓存中进行获取即可,定时任务的代码如下:
public void init() {
// 开启一个定时任务,每10s去从注册中心获取到tc集群最新的地址,然后分别与之建立channel连接
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
// 启动netty客户端
clientBootstrap.start();
}
io.seata.core.rpc.netty.NettyClientChannelManager#reconnect
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
// 从注册服务上获取tc的地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
RegistryService registryService = RegistryFactory.getInstance();
String clusterName = registryService.getServiceGroup(transactionServiceGroup);
if (StringUtils.isBlank(clusterName)) {
LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
transactionServiceGroup);
return;
}
if (!(registryService instanceof FileRegistryServiceImpl)) {
LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
}
return;
}
Set<String> channelAddress = new HashSet<>(availList.size());
try {
// 遍历所有的tc地址
for (String serverAddress : availList) {
try {
// 根据tc的地址获取对应已连接的channel
acquireChannel(serverAddress);
channelAddress.add(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
serverAddress, e.getMessage(), e);
}
}
} finally {
if (CollectionUtils.isNotEmpty(channelAddress)) {
List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
for (String address : channelAddress) {
String[] array = address.split(":");
aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
}
// 把已经建立好连接的channel缓存起来
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
} else {
RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
}
}
}
public interface RegistryService<T> {
// ......
default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
}
default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
}
}
定时器会每隔10s调用channel管理器NettyClientChannelManager的reconnect方法,reconnect方法就会去通过注册工厂获取TC的地址并缓存到CURRENT_ADDRESS_MAP这个map中,之后loadBalance方法就会从CURRENT_ADDRESS_MAP获取到TC地址
(2)定义客户端handler
/**
* Netty客户端处理接收消息的handler
*/
@Sharable
class ClientHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
processMessage(ctx, (RpcMessage) msg);
}
// ......
}
在AbstractNettyRemotingClient中会去把ClientHandler作为客户端的handler,当客户端在接收服务端发送过来的数据的时候,会回调channelRead方法,而channelRead方法会调用父类processMessage方法
(3)注册处理器
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(requestCode, pair);
}
定义了registerProcessor方法,子类可以通过该方法往processorTable中进行注册客户端的处理器
(4)批量发送
Seata并没有开放批量请求的api给用户去使用,也就是说我们并不能去把多个RPC请求作为参数去进行发送,整个批量发送的机制是在Seata底层去进行的,代码如下:
io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object)
// 条件成立:说明允许发送批量请求
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// 把RPC请求放到队列中
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 条件成立:说明此时异步线程处于空闲状态,此时唤醒异步线程取进行批量请求发送
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
在客户端发送同步请求的时候,Seata还支持批量请求的发送,可以看到,上面的代码中在执行同步发送的时候会判断是否开启了批量请求,如果开启了就把RPC请求放到一个队列中(每一个发送目标地址对应一个队列),然后再通过MessageFuture进行同步阻塞。那么放到队列中有什么用呢?通常看到把某个东西放到队列中的,大多数都是通过异步线程从队列中去取然后执行的,果然,在AbstractNettyRemotingClient中确实开启了一个异步线程
public void init() {
// ......
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
// ......
}
private class MergedSendRunnable implements Runnable {
@Override
public void run() {
@Override
public void run() {
while (true) {
// 等待唤醒,避免一直死循环占用cpu资源
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
// 设置为正在批量发送中
isSending = true;
// 遍历所有的批量请求
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
// 把要发送到同一个目标地址的请求都放到MergedWarpMessage请求中
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
// 获取与目标地址的channel连接
sendChannel = clientChannelManager.acquireChannel(address);
// 把这个批量请求异步发送给服务端
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
// 发送完之后恢复标志位
isSending = false;
}
}
// ......
}
可以看到,异步线程会去从队列中获取到刚才我们放进去的RPC请求,然后把发送到同一个地址的RPC请求都汇集到一个MergedWarpMessage请求中,再把MergedWarpMessage请求异步发送到服务端。那么服务端拿到这个批量请求之后会怎样呢?接下来我们去看下服务端的代码:
io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
// 获取到请求对象
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (!(message instanceof AbstractMessage)) {
return;
}
// 条件成立:说明是批量发送
if (message instanceof MergedWarpMessage) {
// 条件成立:服务端开启了批量响应
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
// 获取批量发送的请求
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
// 获取批量发送的请求id
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
// 遍历所有的批量请求并进行处理
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
if (PARALLEL_REQUEST_HANDLE) {
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
// 批量请求都处理完得到的结果集合
List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures = null;
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.runAsync(() -> {
results.add(finalI, handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
}));
}
// 条件成立:说明没有开启并行处理请求,此时就对每一个请求进行串行处理
else {
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
try {
// 多线程并行处理请求
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
// 创建一个MergeResultMessage请求,把结果集合都放进去该请求中然后异步发送给客户端
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
}
// 条件成立:说明发送过来的是单个请求
else {
final AbstractMessage msg = (AbstractMessage) message;
// 处理这个请求,得到处理结果
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
// 响应这个处理结果
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
当服务端接收到MergedWarpMessage请求的时候,会有两种处理方式:
- 服务端开启了批量响应并且客户端版本大于等于1.5:
/**
* 发送批量响应请求的定时任务
*
* @since 1.5.0
*/
private class BatchResponseRunnable implements Runnable {
@Override
public void run() {
while (true) {
synchronized (batchResponseLock) {
try {
batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);
} catch (InterruptedException e) {
LOGGER.error("BatchResponseRunnable Interrupted error", e);
}
}
isResponding = true;
basketMap.forEach((channel, msgQueue) -> {
if (msgQueue.isEmpty()) {
return;
}
// Because the [serialization,compressor,rpcMessageId,headMap] of the response
// needs to be the same as the [serialization,compressor,rpcMessageId,headMap] of the request.
// Assemble by grouping according to the [serialization,compressor,rpcMessageId,headMap] dimensions.
Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();
while (!msgQueue.isEmpty()) {
QueueItem item = msgQueue.poll();
BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,
new ClientRequestRpcInfo(item.getRpcMessage()),
key -> new BatchResultMessage());
batchResultMessage.getResultMessages().add(item.getResultMessage());
batchResultMessage.getMsgIds().add(item.getMsgId());
}
batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->
remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo),
channel, batchResultMessage));
});
isResponding = false;
}
}
}
把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,然后把这些结果以同一个channel为维度放在一个队列中,服务端会开启一个线程从每一个channel的队列获取执行结果放到一个BatchResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端
- 服务端没有开启批量响应或者客户端版本小于1.5:
把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,把这些结果都放到一个MergeResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端
可以看到,其实这两种方式的差异并不大,主要的区别在于第一种方式响应给客户端的请求是由异步线程去做的,第二种方式响应给客户端是由processor本身的执行线程去做的
RmNettyRemotingClient
注册RM端的处理器
/**
* 注册RM端的处理器
*/
private void registerProcessor() {
// 注册处理分支事务commit的handler
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 注册处理分支事务rollback的handler
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 处理处理undoLog的handler
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 注册处理接收TC端响应的handler
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
// 注册处理心跳请求的handler
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
TmNettyRemotingClient
注册TM端的处理器
/**
* 注册处理器
*/
private void registerProcessor() {
// 注册TC响应数据过来的处理器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);
// 注册心跳接收的处理器
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
服务端(TC)
抽象基类AbstractNettyRemotingServer
Netty服务启动
public void init() {
super.init();
// 启动netty服务
serverBootstrap.start();
}
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
super(messageExecutor);
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
// 设置netty的接收handler
serverBootstrap.setChannelHandlers(new ServerHandler());
}
重写父类的init方法,在启动的时候会调用init方法来启动一个Netty服务。那么什么时候会去调用init方法呢?答案是在Server类的start方法中
public class Server {
public static void start(String[] args) {
// ......
nettyRemotingServer.init();
}
}
@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);
private boolean started = Boolean.FALSE;
private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();
public static void addDisposable(Disposable disposable) {
DISPOSABLE_LIST.add(disposable);
}
@Override
public void run(String... args) {
try {
long start = System.currentTimeMillis();
Server.start(args);
started = true;
long cost = System.currentTimeMillis() - start;
LOGGER.info("seata server started in {} millSeconds", cost);
} catch (Throwable e) {
started = Boolean.FALSE;
LOGGER.error("seata server start error: {} ", e.getMessage(), e);
System.exit(-1);
}
}
// ......
}
而io.seata.server.Server#start方法又是被io.seata.server.ServerRunner#run所调用,而ServerRunner又实现了springboot的CommandLineRunner接口,基于springboot的扩展机制,当容器启动完成之后就会回调所有的CommandLineRunner接口的run方法,从而在服务启动的时候间接地去启动Netty服务
NettyRemotingServer
注册服务端的处理器
/**
* 注册处理器
*/
private void registerProcessor() {
// 注册处理请求消息的处理器
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 注册处理响应请求的处理器
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 注册处理RM注册请求的处理器
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 注册处理TM注册请求的处理器
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 注册处理心跳消息的处理器
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}