深入探讨Seata RPC模块的设计与实现

news2024/9/20 8:15:43

在Seata中,TM,RM与TC都需要进行跨进程的网络调用,通常来说就会需要RPC来支持远程调用,而Seata内部就有自身基于Netty的RPC实现,这里我们就来看下Seata是如何进行RPC设计与实现的

 RPC整体设计

 抽象基类AbstractNettyRemoting

 该类是整个RPC设计的一个顶层抽象类,其主要实现了同步发送与异步发送的功能,此时在这里还没区分客户端与服务端,因为无论是客户端还是服务端,双方都应该有发送数据的功能

(1)同步请求sendSync 

/**
 * 发送一个同步的rpc请求
 *
 * @param channel       netty的channel对象
 * @param rpcMessage    rpc的消息载体对象
 * @param timeoutMillis rpc超时时间
 * @return 响应的数据对象
 * @throws TimeoutException 请求超时异常
 */
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    if (timeoutMillis <= 0) {
        throw new FrameworkException("timeout should more than 0ms");
    }
    if (channel == null) {
        LOGGER.warn("sendSync nothing, caused by null channel.");
        return null;
    }

    // 创建一个MessageFuture同步发送辅助对象,并放到futures集合中
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);

    // 检查该channel是否可写(channel中有写缓冲区,如果写缓冲区达到了一定的水位此时的状态就是不可写)
    channelWritableCheck(channel, rpcMessage.getBody());
    // 获取要发送的目标ip地址
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    // 执行rpc发送前的钩子方法
    doBeforeRpcHooks(remoteAddr, rpcMessage);

    // 发送数据
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        // 条件成立:说明发送失败
        if (!future.isSuccess()) {
            // 把这个MessageFuture从futures集合中清除
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                // 设置发送异常类型
                messageFuture1.setResultMessage(future.cause());
            }

            // 销毁channel
            destroyChannel(future.channel());
        }
    });

    try {
        // 由于netty是异步发送的,所以为了达到同步发送的效果,在这里会进行阻塞,当接收端发送响应之后才会解除阻塞,这是一种常用的异步转同步的方式
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        // 执行rpc发送后的钩子方法
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        // 返回响应的数据对象
        return result;
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
                     rpcMessage.getBody());
        // 等待响应超时,请求已经发出去了
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        }
            // 发送请求异常,请求还没发出去
        else {
            throw new RuntimeException(exx);
        }
    }
}
  1. 构建MessageFuture对象,RpcMessage的id作为key,MessageFuture作为value放到futures这个map中
  2. 检查下发送的目标channel是否是可写的
  3. 执行钩子的发送前置方法
  4. 发送数据,如果发送失败就把RpcMessage从futures中进行移除,并且销毁这个channel
  5. 通过MessageFuture的get方法进行阻塞,同步获取响应结果

这里着重关注一下MessageFuture,通常来说如果我们基于Netty去实现RPC的话,由于Netty是异步发送请求的,所以单纯通过Netty我们是不能做到同步请求的效果的,而一般异步转同步的解决方案就是使用Java的CountdownLatch(RocketMQ是以CountdownLatch实现的)或者CompleteFuture来进行同步阻塞。比如使用CompleteFuture的话具体的实现流程就是创建一个CompleteFuture对象,然后把这个对象先缓存到map中(key为请求id,value为这个CompleteFuture对象),接着调用netty的发送数据,调用完之后通过CompleteFuture的get方法进行同步阻塞,阻塞到什么时候呢?当接收到对方的响应请求的时候,会根据响应请求的请求id(响应请求id与发送请求id是相同的)去从map中找到对应的CompleteFuture对象,然后把响应结果作为参数去调用CompleteFuture对象的complete方法,根据CompleteFuture的特性,此时同步阻塞的get方法就会解除阻塞并获取到上面传入的响应结果。正好Seata实现异步转同步就是使用CompleteFuture的方式,每一个MessageFuture对象中就包装了一个CompleteFuture,具体代码如下: 

/**
 * 通过该类的辅助去实现Netty发送数据的同步效果
 * 具体方案:在每次往接收端发送数据之前,创建一个新的MessageFuture对象然后放到一个集合中,该集合的key是该数据的唯一id,value是这个MessageFuture对象,
 * 然后当调用完netty的发送数据api之后就可以利用MessageFuture中的CompletableFuture进行阻塞,阻塞到什么时候呢?
 * 当接收端那边接收到发送过来的数据之后,就发送一个对应的响应请求过来,这个响应请求会带着发送的那个数据的唯一id,只要收到这个响应请求之后,根据唯一id去集合中
 * 寻找到对应的MessageFuture对象,然后就可以让里面的CompletableFuture对象解除阻塞,这样整个过程就实现了同步发送的效果了。该方案也是异步转同步的典型方案
 *
 * @author slievrly
 */
public class MessageFuture {

    /**
     * 同步发送的数据载体对象
     */
    private RpcMessage requestMessage;

    /**
     * 同步发送的超时时间
     */
    private long timeout;

    private long start = System.currentTimeMillis();

    private transient CompletableFuture<Object> origin = new CompletableFuture<>();

    /**
     * Is timeout boolean.
     *
     * @return the boolean
     */
    public boolean isTimeout() {
        return System.currentTimeMillis() - start > timeout;
    }

    /**
     * 同步等待获取结果值
     *
     * @param timeout 等待超时时间
     * @param unit    时间单位
     * @return 获取到的结果值
     * @throws TimeoutException 超时异常
     * @throws InterruptedException 中断异常
     */
    public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
        Object result = null;
        try {
            result = origin.get(timeout, unit);
            if (result instanceof TimeoutException) {
                throw (TimeoutException)result;
            }
        } catch (ExecutionException e) {
            throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
        } catch (TimeoutException e) {
            throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
        }

        if (result instanceof RuntimeException) {
            throw (RuntimeException)result;
        } else if (result instanceof Throwable) {
            throw new RuntimeException((Throwable)result);
        }

        return result;
    }

    /**
     * 设置结果值,有可能是个超时异常 {@link TimeoutException}
     *
     * @param obj 结果值
     */
    public void setResultMessage(Object obj) {
        origin.complete(obj);
    }
}

MessageFuture的get方法和setResultMessage方法就是分别对CompleteFuture的get方法和complete方法的封装

(2)异步请求sendAsync 

/**
 * 发送一个异步RPC请求
 *
 * @param channel    channel
 * @param rpcMessage rpc请求载体对象
 */
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
    channelWritableCheck(channel, rpcMessage.getBody());
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
                     + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }

    doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);

    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            destroyChannel(future.channel());
        }
    });
}

异步发送就比较简单了,因为Netty本身就是异步发送的,所以这里就不用再做额外的处理了 

(3)删除过期的MessageFuture 

public void init() {
    // 开启一个定时任务,每3s去检查一下所有正在同步发送的MessageFuture,如果发送有超时的,则及时从futures集合中清除
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                MessageFuture future = entry.getValue();
                if (future.isTimeout()) {
                    futures.remove(entry.getKey());
                    RpcMessage rpcMessage = future.getRequestMessage();
                    future.setResultMessage(new TimeoutException(String
                                                                 .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                    }
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}

上面我们知道了同步请求的实现原理,对于超时的同步请求的MessageFuture如果不进行清理,那么就会一直存在于futures这个map中,在AbstractNettyRemoting中提供了一个init方法,在init方法中会去通过一个定时器每隔3s去map中进行移除掉过期的MessageFuture 

(4)处理数据接收 

无论是客户端还是服务端,都需要对接收到的数据进行处理,所以在这一层中也对接收数据进行了统一的定义处理,代码如下: 

/**
 * 接收rpc请求
 *
 * @param ctx        ChannelHandlerContext对象
 * @param rpcMessage 接收的消息载体对象
 * @throws Exception throws exception process message error.
 * @since 1.3.0
 */
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
    }

    // 获取消息内容
    Object body = rpcMessage.getBody();
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;

        // 根据消息类型去获取到对应的处理器进行处理
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        if (pair != null) {
            if (pair.getSecond() != null) {
                try {
                    // 交给线程池去处理
                    pair.getSecond().execute(() -> {
                        try {
                            // 处理器处理接收到的消息
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                                 "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                    if (allowDumpStack) {
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        long idx = System.currentTimeMillis();
                        try {
                            String jstackFile = idx + ".log";
                            LOGGER.info("jstack command will dump to " + jstackFile);
                            Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                        } catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        allowDumpStack = false;
                    }
                }
            } else {
                try {
                    pair.getFirst().process(ctx, rpcMessage);
                } catch (Throwable th) {
                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                }
            }
        } else {
            LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
        }
    } else {
        LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    }
}

当接收到数据之后,会根据数据的类型去从processorTable这个map中获取到对应的pair,processorTable又是个什么东西呢,里面存的pair又是干什么的?processorTable中的key是数据的类型,key是pair,pair里面存了两个对象,一个是RemotingProcessor,另一个是ExecutorService(也就是线程池),因为接收到的消息是需要去走不同的业务处理逻辑的,比如RM接收到TC发送过来的commit消息就走commit的处理逻辑,接收到rollback消息就走rollback的处理逻辑,所以在Seata中为了达到这样的效果,就需要对接收的数据进行类型区分,然后每一种类型的消息都会对应一个处理器RemotingProcessor,而处理器则交由线程池来执行。那么processorTable的key-value又是哪里来的呢?答案就是交由客户端与服务端根据自身的业务需求来具体往processorTable中注册处理器

客户端(TM/RM) 

AbstractNettyRemotingClient 

该类继承于AbstractNettyRemoting,是Netty客户端的抽象基类,基于AbstractNettyRemoting的发送数据能力之上,实现了通过不同的负载均衡策略去选择服务端地址以及批量发送的功能 

(1)服务端负载均衡 
protected String loadBalance(String transactionServiceGroup, Object msg) {
    InetSocketAddress address = null;
    try {
        // 获取到transactionServiceGroup对应的TC集群的地址集合
        @SuppressWarnings("unchecked")
        List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
        // 通过负载均衡算法(默认是XIDLoadBalance)选择出一个TC的地址
        address = this.doSelect(inetSocketAddressList, msg);
    } catch (Exception ex) {
        LOGGER.error(ex.getMessage());
    }
    if (address == null) {
        throw new FrameworkException(NoAvailableService);
    }
    return NetUtil.toStringAddress(address);
}

loadBalance方法主要做的就是通过事务服务组名从注册工厂中去获取到所有的服务端地址,然后再根据具体的负载均衡算法去选择出一个服务端地址并返回,那么这里就会涉及到有两个点了,一个是注册工厂,一个是负载均衡算法,这两个都是有具体不同实现的。对于注册工厂来说,在Seata中提供了多种TC(在Seata中TC就是服务端)注册的方式,比如支持Nacos,Etcd,Eureka,Redis,Zookeeper等等,而负载均衡算法也一样,有轮询,随机等方式,既然有多种实现方式,Seata是怎么进行选择与适配的呢?这里很自然就会想到通过SPI机制,我们通过配置某一种方式然后Seata就可以根据我们的配置去进行选择具体的策略,而之后如果还要扩展其它方式的时候,对于Seata来说就很方便了,只需要对接SPI接口来实现即可。另外还有个问题就是loadBalance方法是每次在发送消息的时候就会去调用,是不是每次都需要去从注册工厂中实时获取到TC的地址呢?其实并没这个必要,因为TC的地址一般都不会频繁变动(TC的上下线也不会很多),所以Seata对于这一点做了些优化,就是通过一个定时任务每隔10s从注册工厂中获取到TC的地址然后建立channel连接并缓存起来,之后每次调用loadBalance去获取TC地址的时候从这个缓存中进行获取即可,定时任务的代码如下: 

public void init() {
    // 开启一个定时任务,每10s去从注册中心获取到tc集群最新的地址,然后分别与之建立channel连接
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }
    super.init();

    // 启动netty客户端
    clientBootstrap.start();
}

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect 

void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
    // 从注册服务上获取tc的地址
    availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
    LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
    return;
}
if (CollectionUtils.isEmpty(availList)) {
    RegistryService registryService = RegistryFactory.getInstance();
    String clusterName = registryService.getServiceGroup(transactionServiceGroup);

    if (StringUtils.isBlank(clusterName)) {
        LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",
                     ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,
                     transactionServiceGroup);
        return;
    }

    if (!(registryService instanceof FileRegistryServiceImpl)) {
        LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);
    }
    return;
}

Set<String> channelAddress = new HashSet<>(availList.size());
try {
    // 遍历所有的tc地址
    for (String serverAddress : availList) {
        try {
            // 根据tc的地址获取对应已连接的channel
            acquireChannel(serverAddress);
            channelAddress.add(serverAddress);
        } catch (Exception e) {
            LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),
                         serverAddress, e.getMessage(), e);
        }
    }
} finally {
    if (CollectionUtils.isNotEmpty(channelAddress)) {
        List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());
        for (String address : channelAddress) {
            String[] array = address.split(":");
            aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));
        }
        // 把已经建立好连接的channel缓存起来
        RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);
    } else {
        RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());
    }
}
}
public interface RegistryService<T> {

    // ......

    default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {
        return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());
    }

    default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,
        List<InetSocketAddress> aliveAddress) {
        return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
    }
    
}

定时器会每隔10s调用channel管理器NettyClientChannelManager的reconnect方法,reconnect方法就会去通过注册工厂获取TC的地址并缓存到CURRENT_ADDRESS_MAP这个map中,之后loadBalance方法就会从CURRENT_ADDRESS_MAP获取到TC地址 

(2)定义客户端handler 
/**
 * Netty客户端处理接收消息的handler
 */
@Sharable
class ClientHandler extends ChannelDuplexHandler {

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof RpcMessage)) {
            return;
        }
        processMessage(ctx, (RpcMessage) msg);
    }

    // ......
   
}

在AbstractNettyRemotingClient中会去把ClientHandler作为客户端的handler,当客户端在接收服务端发送过来的数据的时候,会回调channelRead方法,而channelRead方法会调用父类processMessage方法 

(3)注册处理器 
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
    Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
    this.processorTable.put(requestCode, pair);
}

定义了registerProcessor方法,子类可以通过该方法往processorTable中进行注册客户端的处理器 

(4)批量发送 

Seata并没有开放批量请求的api给用户去使用,也就是说我们并不能去把多个RPC请求作为参数去进行发送,整个批量发送的机制是在Seata底层去进行的,代码如下: 

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object) 

// 条件成立:说明允许发送批量请求
if (this.isEnableClientBatchSendRequest()) {

    // send batch message is sync request, needs to create messageFuture and put it in futures.
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);

    // 把RPC请求放到队列中
    BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                                                                       key -> new LinkedBlockingQueue<>());
    if (!basket.offer(rpcMessage)) {
        LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                     serverAddress, rpcMessage);
        return null;
    }
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("offer message: {}", rpcMessage.getBody());
    }

    // 条件成立:说明此时异步线程处于空闲状态,此时唤醒异步线程取进行批量请求发送
    if (!isSending) {
        synchronized (mergeLock) {
            mergeLock.notifyAll();
        }
    }

    try {
        return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
    } catch (Exception exx) {
        LOGGER.error("wait response error:{},ip:{},request:{}",
                     exx.getMessage(), serverAddress, rpcMessage.getBody());
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw new RuntimeException(exx);
        }
    }

}

在客户端发送同步请求的时候,Seata还支持批量请求的发送,可以看到,上面的代码中在执行同步发送的时候会判断是否开启了批量请求,如果开启了就把RPC请求放到一个队列中(每一个发送目标地址对应一个队列),然后再通过MessageFuture进行同步阻塞。那么放到队列中有什么用呢?通常看到把某个东西放到队列中的,大多数都是通过异步线程从队列中去取然后执行的,果然,在AbstractNettyRemotingClient中确实开启了一个异步线程 

public void init() {

    // ......
    if (this.isEnableClientBatchSendRequest()) {
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                          MAX_MERGE_SEND_THREAD,
                                                          KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                          new LinkedBlockingQueue<>(),
                                                          new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
    }

    // ......
}
private class MergedSendRunnable implements Runnable {

    @Override
    public void run() {
        @Override
        public void run() {
            while (true) {
                // 等待唤醒,避免一直死循环占用cpu资源
                synchronized (mergeLock) {
                    try {
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {
                    }
                }

                // 设置为正在批量发送中
                isSending = true;
                // 遍历所有的批量请求
                basketMap.forEach((address, basket) -> {
                    if (basket.isEmpty()) {
                        return;
                    }

                    // 把要发送到同一个目标地址的请求都放到MergedWarpMessage请求中
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        // send batch message is sync request, but there is no need to get the return value.
                        // Since the messageFuture has been created before the message is placed in basketMap,
                        // the return value will be obtained in ClientOnResponseProcessor.
                        // 获取与目标地址的channel连接
                        sendChannel = clientChannelManager.acquireChannel(address);
                        // 把这个批量请求异步发送给服务端
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            destroyChannel(address, sendChannel);
                        }
                        // fast fail
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = futures.remove(msgId);
                            if (messageFuture != null) {
                                messageFuture.setResultMessage(
                                    new RuntimeException(String.format("%s is unreachable", address), e));
                            }
                        }
                        LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                    }
                });

                // 发送完之后恢复标志位
                isSending = false;
            }
    }

    // ......
}

可以看到,异步线程会去从队列中获取到刚才我们放进去的RPC请求,然后把发送到同一个地址的RPC请求都汇集到一个MergedWarpMessage请求中,再把MergedWarpMessage请求异步发送到服务端。那么服务端拿到这个批量请求之后会怎样呢?接下来我们去看下服务端的代码: 

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage 

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        // 获取到请求对象
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());

        if (!(message instanceof AbstractMessage)) {
            return;
        }

        // 条件成立:说明是批量发送
        if (message instanceof MergedWarpMessage) {
            // 条件成立:服务端开启了批量响应
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                // 获取批量发送的请求
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                // 获取批量发送的请求id
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                // 遍历所有的批量请求并进行处理
                for (int i = 0; i < msgs.size(); i++) {
                    AbstractMessage msg = msgs.get(i);
                    int msgId = msgIds.get(i);
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                    } else {
                        handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                    }
                }
            } else {
                // 批量请求都处理完得到的结果集合
                List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
                List<CompletableFuture<Void>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<>();
                        }
                        int finalI = i;
                        completableFutures.add(CompletableFuture.runAsync(() -> {
                            results.add(finalI, handleRequestsByMergedWarpMessage(
                                ((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
                        }));
                    }
                    // 条件成立:说明没有开启并行处理请求,此时就对每一个请求进行串行处理
                    else {
                        results.add(i,
                            handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                    }
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        // 多线程并行处理请求
                        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
                    } catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", e.getMessage(), e);
                    }
                }

                // 创建一个MergeResultMessage请求,把结果集合都放进去该请求中然后异步发送给客户端
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        }
        // 条件成立:说明发送过来的是单个请求
        else {
            final AbstractMessage msg = (AbstractMessage) message;
            // 处理这个请求,得到处理结果
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            // 响应这个处理结果
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }

当服务端接收到MergedWarpMessage请求的时候,会有两种处理方式:

  • 服务端开启了批量响应并且客户端版本大于等于1.5:
/**
 * 发送批量响应请求的定时任务
 *
 * @since 1.5.0
 */
private class BatchResponseRunnable implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (batchResponseLock) {
                try {
                    batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);
                } catch (InterruptedException e) {
                    LOGGER.error("BatchResponseRunnable Interrupted error", e);
                }
            }
            isResponding = true;
            basketMap.forEach((channel, msgQueue) -> {
                if (msgQueue.isEmpty()) {
                    return;
                }
                // Because the [serialization,compressor,rpcMessageId,headMap] of the response
                // needs to be the same as the [serialization,compressor,rpcMessageId,headMap] of the request.
                // Assemble by grouping according to the [serialization,compressor,rpcMessageId,headMap] dimensions.
                Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();
                while (!msgQueue.isEmpty()) {
                    QueueItem item = msgQueue.poll();
                    BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,
                                                                                            new ClientRequestRpcInfo(item.getRpcMessage()),
                                                                                            key -> new BatchResultMessage());
                    batchResultMessage.getResultMessages().add(item.getResultMessage());
                    batchResultMessage.getMsgIds().add(item.getMsgId());
                }
                batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->
                                              remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo),
                                                                               channel, batchResultMessage));
            });
            isResponding = false;
        }
    }
}

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,然后把这些结果以同一个channel为维度放在一个队列中,服务端会开启一个线程从每一个channel的队列获取执行结果放到一个BatchResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

  • 服务端没有开启批量响应或者客户端版本小于1.5:

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,把这些结果都放到一个MergeResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

可以看到,其实这两种方式的差异并不大,主要的区别在于第一种方式响应给客户端的请求是由异步线程去做的,第二种方式响应给客户端是由processor本身的执行线程去做的 

RmNettyRemotingClient 

注册RM端的处理器 
/**
 * 注册RM端的处理器
 */
private void registerProcessor() {
    // 注册处理分支事务commit的handler
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);

    // 注册处理分支事务rollback的handler
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);

    // 处理处理undoLog的handler
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);

    // 注册处理接收TC端响应的handler
    ClientOnResponseProcessor onResponseProcessor =
    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);

    // 注册处理心跳请求的handler
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

TmNettyRemotingClient 

注册TM端的处理器 
/**
 * 注册处理器
 */
private void registerProcessor() {
    // 注册TC响应数据过来的处理器
    ClientOnResponseProcessor onResponseProcessor =
    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);

    // 注册心跳接收的处理器
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

服务端(TC) 

抽象基类AbstractNettyRemotingServer 

Netty服务启动 
public void init() {
    super.init();
    // 启动netty服务
    serverBootstrap.start();
}
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
    super(messageExecutor);
    serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
    // 设置netty的接收handler
    serverBootstrap.setChannelHandlers(new ServerHandler());
}

重写父类的init方法,在启动的时候会调用init方法来启动一个Netty服务。那么什么时候会去调用init方法呢?答案是在Server类的start方法中 

public class Server {

    public static void start(String[] args) {
        // ......
        nettyRemotingServer.init();
    }
}

@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);

    private boolean started = Boolean.FALSE;

    private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();

    public static void addDisposable(Disposable disposable) {
        DISPOSABLE_LIST.add(disposable);
    }

    @Override
    public void run(String... args) {
        try {
            long start = System.currentTimeMillis();
            Server.start(args);
            started = true;

            long cost = System.currentTimeMillis() - start;
            LOGGER.info("seata server started in {} millSeconds", cost);
        } catch (Throwable e) {
            started = Boolean.FALSE;
            LOGGER.error("seata server start error: {} ", e.getMessage(), e);
            System.exit(-1);
        }
    }

    // ......
}

而io.seata.server.Server#start方法又是被io.seata.server.ServerRunner#run所调用,而ServerRunner又实现了springboot的CommandLineRunner接口,基于springboot的扩展机制,当容器启动完成之后就会回调所有的CommandLineRunner接口的run方法,从而在服务启动的时候间接地去启动Netty服务 

NettyRemotingServer 

注册服务端的处理器 
/**
 * 注册处理器
 */
private void registerProcessor() {
    // 注册处理请求消息的处理器
    ServerOnRequestProcessor onRequestProcessor =
    new ServerOnRequestProcessor(this, getHandler());
    ShutdownHook.getInstance().addDisposable(onRequestProcessor);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);

    // 注册处理响应请求的处理器
    ServerOnResponseProcessor onResponseProcessor =
    new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);

    // 注册处理RM注册请求的处理器
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);

    // 注册处理TM注册请求的处理器
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);

    // 注册处理心跳消息的处理器
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/705144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Dinky:问题总结

一、启动时指定flink版本&#xff0c;因为dinky本身也集成了部分flink ./auto.sh start 1.12 二、数据源管理新增mysql时的url jdbc:mysql://ip:3306/dinky?useUnicodetrue&characterEncodingutf8&useSSLfalse&autoReconnecttrue&failOverReadOnlyfalse 不要…

2、JAVA 分支结构 switch结构 for循环

1 分支结构 1.1 概述 顺序结构的程序虽然能解决计算、输出等问题 但不能做判断再选择。对于要先做判断再选择的问题就要使用分支结构 1.2 形式 1.3.1 练习&#xff1a;商品打折案例 创建包: cn.tedu.basic 创建类: TestDiscount.java 需求: 接收用户输入的原价。满1000打9折…

网工内推 | 运营商招网工,3年以上网安经验,CISP/CCIE认证优先

01 微算互联 招聘岗位&#xff1a;网络工程师 职责描述&#xff1a; 1、负责生产系统的网络管理、网络监控告警、网络设备数据资料备份/恢复容灾管理&#xff1b; 2、海外、tob 、私有云网络搭建及运维7 X 24小时值班&#xff1b; 3、负责业务系统工程网络层面规划、建设、升级…

Java之Javac、JIT、AOT之间的关系

Javac&#xff1a;javac 是java语言编程编译器。全称java compiler。但这时候还是不能直接执行的&#xff0c;因为机器只能读懂汇编&#xff0c;也就是二进制&#xff0c;因此还需要进一步把.class文件编译成二进制文件。 Java的执行过程 详细流程 结论&#xff1a;javac编译后…

使用Python调用aapt命令查看APK文件信息

以下演示内容在window的操作系统 1、下载 aapt 下载完成后注意配置环境变量&#xff01;&#xff01;&#xff01; 地址&#xff1a;https://www.mediafire.com/file/e8ww8wbgcowbti4/aapt 2、代码实现 import os import re import subprocess#获取当前操作系统 current_os o…

S型平滑函数功能块(CODESYS ST完整源代码)

S型平滑函数在多段曲线控温上的应用。完整算法介绍请参看下面文章博客: 带平滑功能的斜坡函数(多段曲线控温纯S型曲线SCL源代码+完整算法分析)_RXXW_Dor的博客-CSDN博客PLC运动控制基础系列之梯形速度曲线,可以参看下面这篇博客:PLC运动控制基础系列之梯形速度曲线_RXXW_…

RTL8720CF烧录工具

一、环境准备 1、 解压烧录工具包 AmebaZII_PGTool_v1.2.39.zip 2、 日志串口接串口调试助手 3、 模组A0脚接高电平 二、烧录 1、模组重新上电&#xff0c;串口有Download Image over …… 输出&#xff0c;表示模组已进入烧录模式&#xff0c;如图&#xff1a; 2、打开上…

springboot+vue校园一卡通管理系统_q7e7o

近些年来&#xff0c;随着科技的飞速发展&#xff0c;互联网的普及逐渐延伸到各行各业中&#xff0c;给人们生活带来了十分的便利&#xff0c;校园一卡通利用计算机网络实现信息化管理&#xff0c;使整个校园一卡通管理的发展和服务水平有显著提升。 本文拟采用java技术和Sprin…

【算法题】动态规划中级阶段之买卖股票的最佳时机、三角形最小路径和

动态规划中级阶段 前言一、三角形最小路径和1.1、思路1.2、代码实现 二、买卖股票的最佳时机 II2.1、思路2.2、代码实现 总结 前言 动态规划&#xff08;Dynamic Programming&#xff0c;简称 DP&#xff09;是一种解决多阶段决策过程最优化问题的方法。它是一种将复杂问题分解…

计算机网络————运输层

文章目录 概述UDPTCP首部格式 连接管理连接建立连接释放 概述 从IP层看&#xff0c;通信双方是两个主机。 但真正进行通信的实体是在主机中的进程&#xff0c;是这个主机中的一个进程和另一个主机中的一个进程在交换数据。 所以严格的讲&#xff0c;两个主机进行通信就是两个…

关于【系统学习】和【按需学习】我想说的

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;心得、闲聊、学习、知识☀️每日 一言&#xff1a;不要停驻不前&#xff0c;做一点点都要比什么都不做强上百倍&#xff01; 前言 说起来学习&#xff0c;那就离不开学习的方式。学生时期我们的…

SpringCloudAlibaba实战入门之RocketMQ消息发送(六)

本篇文章是承接上一篇文章《SpringCloudAlibaba实战入门之RocketMQ下载配置和启动(五)》,如果没有看过上一篇文章并按照指导配置和启动Rocket MQ的网友,请先阅读该篇文章以后再阅读本篇 一、创建spring-cloud-rocketmq项目 1、复制之前的项目模块新建一个项目模块,修改新…

5个有趣实用的小工具推荐,让你的生活更加丰富多彩

5个有趣实用的小工具推荐&#xff0c;让你的生活更加丰富多彩 在现代社交媒体和科技的发展中&#xff0c;我们可以利用各种小工具来增添生活的乐趣和便利。今天&#xff0c;我将向大家推荐一些有趣且实用的小工具&#xff0c;这些工具不仅能够让你的生活更加丰富多彩&#xff…

算法分析基础题目

第一章-算法概述 递归算法必须具备的两个条件是边界条件或停止条件和递推方程或递归方程冒泡排序时间复杂度是___&#xff0c;堆排序时间复杂度是___。 O ( n 2 ) O(n^2) O(n2), O ( n l o g n ) O(nlogn) O(nlogn)斐波那契数列的第1项为1&#xff0c;第2项为2&#xff0c;以…

Labview通过OPC与S1200通信

一、配置PC的IP地址 二、S7-1200的配置 通过博图&#xff0c;在PLC CPU的属 性-常规-保护里勾选“允许从 远程伙伴使用PUT/GET通信 访问 三、新建一个DB1数据块&#xff0c;在DB1里新建一个变量&#xff0c;例如 名称为“ASD”&#xff0c;类型为“Word” 四、右击“DB1”&…

全球项目管理软件排行榜揭晓:谁将问鼎榜首?

项目管理软件是一种能够帮助企业和组织有效规划、执行和监控项目的工具。这些软件通常具有任务分配、资源管理、时间跟踪、报告生成等功能&#xff0c;可以提高项目管理的效率和质量。 项目管理软件为企业带来的便利 使用项目管理软件可以帮助企业和组织更好地管理项目&#x…

基于UDP协议的千兆以太网传输(FPGA)

[TOC]基于UDP协议的千兆以太网传输&#xff08;FPGA&#xff09; 一、UDP协议概述 UDP协议是一种基于无连接协议&#xff0c;即发送端发送数据无需确认接收端是否存在&#xff1b;接收端收到数据后也无需给发送端反馈是否收到&#xff0c;所以UDP在数据发送过程中允许丢失一两…

学会Pointer指针事件 ,一套拖拽事件两端(PC端、移动端)跑

早期浏览器很low&#xff0c;它只存在鼠标事件(MouseEvent)。随时代的发展出现了智能手机、平板电脑等触屏设备&#xff0c;交互方式发生了变化&#xff0c;单纯的鼠标事件已不够开发人员使用了。于是引入了触摸事件(TouchEvent)。不过这还不够完美&#xff0c;没有把触控笔事件…

深度卷积神经网络(AlexNet)

目录 1.基础简介 1.1基础介绍 1.2基础架构 2.Alexnet与LeNet的对比 3.参考代码 4.李沐老师给出的例子 1.基础简介 1.1基础介绍 2012年&#xff0c;AlexNet横空出世。它首次证明了学习到的特征可以超越手工设计的特征。它一举打破了计算机视觉研究的现状。 AlexNet使用…

[洛谷]P1162 填涂颜色(搜索连通块)

关键思路转换&#xff1a;从边界为0开始搜索&#xff0c;并且都标记&#xff0c;这些标记的不会被1包围&#xff0c;被1包围的肯定0不会被标记到&#xff0c;所以到时候把没被标记的0就是变成2即可。 详细&#xff1a; ACcode: #include<bits/stdc.h> using namespace s…