Seata服务端的启动过程 学习记录

news2025/1/16 8:04:48

1.ServerRunner

ServerRunner类实现了CommandLineRunner与DisposableBean接口,将会在Spring容器启动和关闭的时间,分别执行

run 和 destory 方法。

而seata服务端的启动过程,都藏在run方法中

image-20230611101314534

2.整体流程

io.seata.server.Server#start

 public static void start(String[] args) {
        // create logger
        final Logger logger = LoggerFactory.getLogger(Server.class);

        //initialize the parameter parser
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        //1.解析配置文件
        ParameterParser parameterParser = new ParameterParser(args);

        //2.初始化监控
        MetricsManager.get().init();

        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());



        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

        //3.创建netty用于服务端TC与客户端TM,RM通信
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

        //4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id
        UUIDGenerator.init(parameterParser.getServerNode());

        //log store mode : file, db, redis
        //5.设置全局事务与分支事务持久化的三种方式
        SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());

        //6.创建并初始化事务协调者 并绑定到NettyRemotingServer到TransactionMessageHandler,监听事务消息并处理
        DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
        coordinator.init();
        nettyRemotingServer.setHandler(coordinator);

        //7.注册销毁事件
        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
        ServerRunner.addDisposable(coordinator);

        //127.0.0.1 and 0.0.0.0 are not valid here.
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
            if (StringUtils.isNotBlank(preferredNetworks)) {
                XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
            } else {
                XID.setIpAddress(NetUtil.getLocalIp());
            }
        }

        //8.启动nettyRemotingServer
        nettyRemotingServer.init();
    }

1.解析配置文件

主要从三处地方获取配置

  1. 启动命令中

    解析启动命令使用的是JCommander,主要有以下的命令

    1. help 打印帮助信息

    2. –host -h :设置注册中心ip

    3. “–port”, “-p” :设置注册中心端口

    4. “–storeMode”, “-m” :设置日志存储模式

    5. “–serverNode”, “-n” :设置服务节点

    6. “–seataEnv”, “-e” : seata环境设置

    7. “–sessionStoreMode”, “-ssm” : 设置会话存储模式,主要有三种文件,数据库,redis

    8. “–lockStoreMode”, “-lsm” : 设置锁信息的存储,主要有三种文件,数据库,redis

      image-20230611104649382

  2. 容器中

  3. 配置中心或者文件中

      private void init(String[] args) {
            try {
                //从命令行中获取配置信息
                getCommandParameters(args);
                //从环境(容器中)获取配置信息
                getEnvParameters();
                if (StringUtils.isNotBlank(seataEnv)) {
                    System.setProperty(ENV_PROPERTY_KEY, seataEnv);
                }
                //从文件中获取配置信息
                if (StringUtils.isBlank(storeMode)) {
                    storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);
                }
                if (StringUtils.isBlank(sessionStoreMode)) {
                    sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);
                }
                if (StringUtils.isBlank(lockStoreMode)) {
                    lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);
                }
            } catch (ParameterException e) {
                printError(e);
            }
    
        }
    

2. 初始化监控信息

MetricsManager.get().init();

image-20230611103309800

默认是关闭的

3.创建netty用于服务端TC与客户端TM,RM通信

 ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

        //3.创建netty用于服务端TC与客户端TM,RM通信
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);

父抽象类 AbstractNettyRemotingServer 中设置 消息处理器

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
        super(messageExecutor);
        serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
        serverBootstrap.setChannelHandlers(new ServerHandler());
    }

4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id

      //4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id
        UUIDGenerator.init(parameterParser.getServerNode());

5.初始化SessionManager与LockManager,管理会话与锁信息

SessionHolder.init(parameterParser.getSessionStoreMode());
        LockerManagerFactory.init(parameterParser.getLockStoreMode());

SessionManager有三个不同的实现,通过SPI机制加载

io.seata.server.storage.file.session.FileSessionManager
io.seata.server.storage.db.session.DataBaseSessionManager
io.seata.server.storage.redis.session.RedisSessionManager

resource下的META-INF文件夹中的service文件,根据要加载类的权限定类名找对应的文件,文件中存放着对应需要加载的类的权限定类型

image-20230611105552100

io.seata.server.session.SessionHolder#init

public static void init(String mode) {
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                    CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
        }
        StoreMode storeMode = StoreMode.get(mode);
        if (StoreMode.DB.equals(storeMode)) {
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
        } else if (StoreMode.FILE.equals(storeMode)) {
            String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                    DEFAULT_SESSION_STORE_FILE_DIR);
            if (StringUtils.isBlank(sessionStorePath)) {
                throw new StoreException("the {store.file.dir} is empty.");
            }
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
            ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
            RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
            RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;

            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
        } else if (StoreMode.REDIS.equals(storeMode)) {
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});

            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
        } else {
            // unknown store
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        reload(storeMode);
    }
EnhancedServiceLoader.load(SessionManager.class, xxxName);

通过SPI机制获取所有需要装配的类后,通过参数二最终选择某一个类

@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable {
@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager

除了META-INF/services/ 路径,还会找 META-INF/seata/路径下

io.seata.common.loader.EnhancedServiceLoader.InnerEnhancedServiceLoader#findAllExtensionDefinition

image-20230611134407708

LockManager同样也有三个实现类,也是通过SPI机制加载

io.seata.server.storage.db.lock.DataBaseLockManager
io.seata.server.storage.file.lock.FileLockManager
io.seata.server.storage.redis.lock.RedisLockManager

6.创建并初始化事务协调者 DefaultCoordinator 并监听事务消息并处理

创建DefaultCoordinator实列

实列 DefaultCoordinator最重要的就是给两个属性赋值

  1. remotingServer

  2. core

remotingServer是用来TC与TM,RM通信的,而core则是真正的事务协调者(TC)

core的coreMap属性将缓存4个实现类,对应seata的四种模式,XA,AT,TCC,SAGA,实现类同样也是通过SPI机制加载,并在完成加载后缓存到coreMap

key为枚举类BranchType到值,value为SPI加载到对应类

io.seata.server.coordinator.DefaultCore#DefaultCore

image-20230611140027239

image-20230611140005213

image-20230611140232206

DefaultCoordinator 初始化

初始化即启动5个周期线程

			//周期重试回滚事务
        retryRollbacking.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
            ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        //周期重试提交事务
        retryCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
            COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        //周期异步提交事务
        asyncCommitting.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
            ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        //周期超时检测
        timeoutCheck.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
            TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

        //周期清理回滚日志
        undoLogDelete.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
            UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);

每个线程池的任务逻辑基本都是一样的,只不过针对不同的事务状态

 protected void handleRetryRollbacking() {
        //条件对象,只处理对应状态的事务会话
        SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
        sessionCondition.setLazyLoadBranch(true);
        //查找所有的全局会话 如果当前事务会话持久方式是DB,则从表中条件查找
        Collection<GlobalSession> rollbackingSessions =
            SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
        //如果为空,表示当前没有事务,则直接返回
        if (CollectionUtils.isEmpty(rollbackingSessions)) {
            return;
        }
        long now = System.currentTimeMillis();
        //遍历所有的事务会话
        SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
            try {
                //如果是正在回滚中的或者已经死亡的事务会话,跳过
                // prevent repeated rollback
                if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
                    && !rollbackingSession.isDeadSession()) {
                    // The function of this 'return' is 'continue'.
                    return;
                }
                //如果超时
                if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
                    //有锁则释放全局锁
                    if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
                        rollbackingSession.clean();
                    }
                    //删除全局事务会话
                    // Prevent thread safety issues
                    SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
                    LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());

                    SessionHelper.endRollbackFailed(rollbackingSession, true);

                    // rollback retry timeout event
                    MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);

                    //The function of this 'return' is 'continue'.
                    return;
                }
                rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
                //如果没有超时,执行回滚
                core.doGlobalRollback(rollbackingSession, true);
            } catch (TransactionException ex) {
                LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
            }
        });
    }

nettyRemotingServer 通信服务类设置handler

将创建的TC绑定到nettyRemotingServer

7.注册销毁事件

        // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
        ServerRunner.addDisposable(coordinator);

将会在Sping容器关闭的时候调用coordinator的destory方法

    @Override
    public void destroy() {
        // 1. first shutdown timed task
        retryRollbacking.shutdown();
        retryCommitting.shutdown();
        asyncCommitting.shutdown();
        timeoutCheck.shutdown();
        undoLogDelete.shutdown();
        branchRemoveExecutor.shutdown();
        try {
            retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
            retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
            asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
            timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
            undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
            branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ignore) {

        }
        // 2. second close netty flow
        if (remotingServer instanceof NettyRemotingServer) {
            ((NettyRemotingServer) remotingServer).destroy();
        }
        // 3. third destroy SessionHolder
        SessionHolder.destroy();
        instance = null;
    }

8.启动NettyRemotingServer,处理TM与RM消息

    @Override
    public void init() {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }

registerProcessor 方法主要是注册一系列消息处理器

 private void registerProcessor() {
        // 1. registry on request message processor
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        ShutdownHook.getInstance().addDisposable(onRequestProcessor);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2. registry on response message processor
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
        // 3. registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4. registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5. registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }

调用 super.registerProcessor 方法 将 消息处理器与线程池封装成一个Pair,然后在放到map中,map的key为消息类型,value为Pair。

ServerOnRequestProcessor 的 transactionMessageHandler 成员变量为第6步创建的TC

image-20230611143404021

super.init 方法则是启动netty

   @Override
    public void init() {
        //启动超时检测
        super.init();
        //启动netty
        serverBootstrap.start();
    }
 @Override
    public void start() {
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                    nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }

                }
            });

        try {
            this.serverBootstrap.bind(getListenPort()).sync();
            XID.setPort(getListenPort());
            LOGGER.info("Server started, service listen port: {}", getListenPort());
            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
            initialized.set(true);
        } catch (SocketException se) {
            throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
        } catch (Exception exx) {
            throw new RuntimeException("Server start failed", exx);
        }
    }

在第3步时,创建并设置了双向通道消息处理器 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler

 public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
        super(messageExecutor);
        serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
        serverBootstrap.setChannelHandlers(new ServerHandler());
    }

image-20230611143857729

ChannelDuplexHandler是Netty框架中的一个双向通道处理器,用于处理网络通信中的读写事件。它继承自ChannelInboundHandler和ChannelOutboundHandler,可以同时处理入站和出站事件。

当通道中有消息时,将调用io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler#channelRead方法

处理消息

    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
        }
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //根据消息的类型,获取Pair,找到消息处理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //处理消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            } finally {
                                MDC.clear();
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            long idx = System.currentTimeMillis();
                            try {
                                String jstackFile = idx + ".log";
                                LOGGER.info("jstack command will dump to " + jstackFile);
                                Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
                            } catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }

假设此次消息是TM开启全局事务的单条消息,则将交由ServerOnRequestProcessor处理,ServerOnRequestProcessor处理的消息类型如下

 * RM:
 * 1) {@link MergedWarpMessage}
 * 2) {@link BranchRegisterRequest}
 * 3) {@link BranchReportRequest}
 * 4) {@link GlobalLockQueryRequest}
 * TM:
 * 1) {@link MergedWarpMessage}
 * 2) {@link GlobalBeginRequest}
 * 3) {@link GlobalCommitRequest}
 * 4) {@link GlobalReportRequest}
 * 5) {@link GlobalRollbackRequest}
 * 6) {@link GlobalStatusRequest}
 @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            } catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }
 private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
                NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
        } else {
            try {
                BatchLogHandler.INSTANCE.getLogQueue()
                    .put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
                        + rpcContext.getTransactionServiceGroup());
            } catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        //处理多条消息
        // the batch send request message
        if (message instanceof MergedWarpMessage) {
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                for (int i = 0; i < msgs.size(); i++) {
                    AbstractMessage msg = msgs.get(i);
                    int msgId = msgIds.get(i);
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(
                            () -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                    } else {
                        handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                    }
                }
            } else {
                List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
                List<CompletableFuture<Void>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<>();
                        }
                        int finalI = i;
                        completableFutures.add(CompletableFuture.runAsync(() -> {
                            results.add(finalI, handleRequestsByMergedWarpMessage(
                                ((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
                        }));
                    } else {
                        results.add(i,
                            handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                    }
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
                    } catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", e.getMessage(), e);
                    }
                }
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        } else {
            //处理单条消息
            // the single send request message
            final AbstractMessage msg = (AbstractMessage) message;
            AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
        }
    }

最终将调用io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin

    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
            throws TransactionException {
        //获取全局事务id放入response中
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                request.getTransactionName(), request.getTimeout()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }

core.begin方法

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        session.begin();

        // transaction start event
        MetricsPublisher.postSessionDoingEvent(session, false);

        return session.getXid();
    }
    public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;
        this.lazyLoadBranch = lazyLoadBranch;
        if (!lazyLoadBranch) {
            this.branchSessions = new ArrayList<>();
        }
        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
        this.xid = XID.generateXID(transactionId);
    }

首先通过雪花算法生成事务id,然后调用 XID.generateXID(transactionId)

最终格式为 ip:端口号:事务id

public static String generateXID(long tranId) {
        return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();
    }

【大部分内容来源于https://saint.blog.csdn.net/article/details/126457129。推荐各位学习seata的朋友看一看】

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/633854.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于html+css的图展示120

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

【JMeter压力测试】通过jmeter压测surging

目录 前言 环境 下载配置源码 JMeter和JDK下载 JDKJmeter安装 Jmeter非GUI运行压测 结尾 前言 surging是异构微服务引擎&#xff0c;提供了模块化RPC请求通道&#xff0c;引擎在RPC服务治理基础之上还提供了各种协议&#xff0c;并且还提供了stage组件&#xff0c;以便针…

最新版CleanMyMac X4.13.4中文版Mac清理软件

cleanmymac是一款强大的Mac系统垃圾清理工具,可以清除Mac系统多余的语言包,系统缓存,应用程序!可智能清理mac磁盘垃圾和多余语言安装包&#xff0c;快速释放电脑内存&#xff0c;轻松管理和升级Mac上的应用。同时CleanMyMac X可以强力卸载恶意软件&#xff0c;修复系统漏洞&…

EXCEL函数2(统计函数,逻辑函数及其余函数)

统计函数 1、COUNT&#xff08;单元格范围&#xff09;&#xff1a; 计算单元格范围的行数&#xff0c;比如用光标选中一定范围内的单元格&#xff0c;那么只要单元格里面有值&#xff0c;那么count函数便会将有值的单元格的数量统计出来 2、COUNTA&#xff08;单元格范围&am…

msf渗透测试学习-与永恒之蓝漏洞案例

MSF是Metasploit Framework的缩写&#xff0c;是一款广泛使用的渗透测试工具&#xff0c;具有强大的攻击功能。它提供了一个模块化的平台&#xff0c;通过将各种攻击载荷、漏洞利用和辅助工具组装在一起&#xff0c;可用于模拟各种攻击&#xff0c;测试系统安全性&#xff0c;也…

Task Add-in Sample (C#)

下例显示了用 C# 编写Task Add-in 的完整源代码。 使用 C# 类库 &#xff08;.NET Framework&#xff09; 创建 Visual Studio 中的项目。实现 IEdmAddIn5。在“任务属性”对话框中创建自定义页。自定义任务详细信息页面。 注意&#xff1a; 若要填充下面的 GUID 属性&#x…

【linux】登录root账户时报错Sorry, that didn‘t work. Please try again.抱歉,这不管用,请再试一次

一、问题背景 登录其他普通账户的GUI桌面&#xff0c;发现都很正常&#xff0c;但是登录管理员账户root的桌面&#xff0c;重启之后一段时间正常&#xff0c;过一段时间就会出现登录报错Sorry, that didn’t work. Please try again. 二、解决办法——配置文件的解析 下面给出…

由于找不到msvcp120.dll丢失的解决方法,计算机丢失msvcp120.dll修复教程

在打开游戏或者软件的时候&#xff0c;计算机提示由于找不到msvcp120.dll&#xff0c;无法继续执行此代码怎么办呢&#xff1f;msvcp120.dll是一个动态链接库&#xff08;DLL&#xff09;文件&#xff0c;其作用是提供一些常用的C函数和类库&#xff0c;以便在Windows操作系统上…

高手都是如何做 Mysql 慢 SQL 优化

tip&#xff1a;作为程序员一定学习编程之道&#xff0c;一定要对代码的编写有追求&#xff0c;不能实现就完事了。我们应该让自己写的代码更加优雅&#xff0c;即使这会费时费力。 &#x1f495;&#x1f495; 推荐&#xff1a;体系化学习Java&#xff08;Java面试专题&#…

2017~2018学年《信息安全》考试试题(A2卷)

北京信息科技大学&#xff0c;2017~2018 学年第二学期《信息安全》考试试题&#xff08;A 卷&#xff09; 适用专业班级&#xff1a;计科15级 重修课程所在学院&#xff1a;计算机学院 考试形式&#xff1a;闭卷 一、单选题&#xff08;本题满分20分&#xff0c;共含10 道小题…

《Java从入门到精通》学习笔记

Java从入门到精通学习笔记 第一章 初识java a) Java是一种通过解释方式来执行的语言。 b) Java语言编写的程序既是编译型&#xff0c;又是解释型的。编译只进行一次&#xff0c;而解释在每次运行程序时都会进行。 c) JDK下载安装 i. path:jdk/bin ii. calsspath:jdk/jre/lib ii…

融合动态反向学习的阿奎拉鹰与哈里斯鹰混合优化算法(DAHHO)-附代码

融合动态反向学习的阿奎拉鹰与哈里斯鹰混合优化算法(DAHHO) 文章目录 融合动态反向学习的阿奎拉鹰与哈里斯鹰混合优化算法(DAHHO)1.哈里斯鹰优化算法2.改进哈里斯鹰优化算法2.1 动态反向学习策略2.2 改进混合算法理论分析 3.实验结果4.参考文献5.Matlab代码6.python代码 摘要&a…

融合黄金正弦和随机游走的哈里斯鹰优化算法(GSHHO)-附代码

融合黄金正弦和随机游走的哈里斯鹰优化算法(GSHHO) 文章目录 融合黄金正弦和随机游走的哈里斯鹰优化算法(GSHHO)1.哈里斯鹰优化算法2.改进哈里斯鹰优化算法2.1 黄金正弦算法2.2 非线性能量指数递减策略2.3 高斯随机游走策略 3.实验结果4.参考文献5.Matlab代码6.python代码 摘要…

【软件环境安装部署】华为云服务器下 Docker 安装 MongoDB 以及 SpringBoot 整合 MongoDB 开发使用

文章目录 安装测试 MongoDB拉取镜像创建和启动容器登录mongo容器&#xff0c;并进入到【admin】数据库创建一个用户&#xff0c;mongo 默认没有用户连接mongo数据库测试数据库&#xff0c;插入一条语句测试数据库&#xff0c;查询刚才插入的语句查看所有数据库开放指定端口开放…

(八)CSharp-泛型中的方法结构委托接口(2)

一、泛型方法 泛型方法可以在泛型和非泛型类以及结构和接口中声明。 1、声明泛型方法 泛型方法具有类型参数列表和可选的约束。 泛型方法有两个参数列表。 封闭在圆括号内的方法参数列表。封闭在尖括号内的类型参数列表。 要声明泛型方法&#xff0c;需要&#xff1a; 在方法…

Java程序猿搬砖笔记(十四)

文章目录 MySQL自定义排序用locate本质是数字排序case when then...语法排序field()函数排序 阿里云Maven镜像仓库无法下载spring-cloud-dependencies依赖问题MySQL更新同一个表的同个字段解决方法一(推荐)解决方法二 SpringBoot返回的JSON中的null转换为空字符串Spring的相关注…

OpenGL光照之光照贴图

文章目录 漫反射贴图镜面光贴图放射光贴图代码 每个物体都拥有自己独特的材质从而对光照做出不同的反应的方法。这样子能够很容易在一个光照的场景中给每个物体一个独特的外观&#xff0c;但是这仍不能对一个物体的视觉输出提供足够多的灵活性。 我们将整个物体的材质定义为一个…

Linux命令:lsof

目录 一、理论 1.lsof 二、实验 1.无参数 2.-p 参数 3.-l 参数 4. -u 参数 5.-c 参数 6.-d 参数 7.fileName 8. -i 参数 一、理论 1.lsof (1)概念 命令 lsof &#xff08; list opened files &#xff09;负责列出系统中已经打开的文件&#xff0c;包括普通文件&a…

【uni-app】使用uni-app实现简单的登录注册功能

文章目录 前言一、页面布局二、注册页面1.注册接口使用2.注册成功提示3.注册成功页面跳转4.完整代码 三、登录页面1.登录接口使用2.本地存储使用3.完整代码 总结 前言 大家好&#xff0c;今天和大家分享一下如何在uni-app中实现简单的登录注册功能。 首先你需要掌握一下知识点…

【SQL Server】数据库开发指南(九)详细讲解 MS-SQL 触发器的的创建、修改、应用与适用场景

本系列博文还在更新中&#xff0c;收录在专栏&#xff1a;#MS-SQL Server 专栏中。 本系列文章列表如下&#xff1a; 【SQL Server】 Linux 运维下对 SQL Server 进行安装、升级、回滚、卸载操作 【SQL Server】数据库开发指南&#xff08;一&#xff09;数据库设计的核心概念…