1.ServerRunner
ServerRunner类实现了CommandLineRunner与DisposableBean接口,将会在Spring容器启动和关闭的时间,分别执行
run 和 destory 方法。
而seata服务端的启动过程,都藏在run方法中
2.整体流程
io.seata.server.Server#start
public static void start(String[] args) {
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
//initialize the parameter parser
//Note that the parameter parser should always be the first line to execute.
//Because, here we need to parse the parameters needed for startup.
//1.解析配置文件
ParameterParser parameterParser = new ParameterParser(args);
//2.初始化监控
MetricsManager.get().init();
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
//3.创建netty用于服务端TC与客户端TM,RM通信
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
//4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
//5.设置全局事务与分支事务持久化的三种方式
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
//6.创建并初始化事务协调者 并绑定到NettyRemotingServer到TransactionMessageHandler,监听事务消息并处理
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
//7.注册销毁事件
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
if (StringUtils.isNotBlank(preferredNetworks)) {
XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
}
//8.启动nettyRemotingServer
nettyRemotingServer.init();
}
1.解析配置文件
主要从三处地方获取配置
-
启动命令中
解析启动命令使用的是JCommander,主要有以下的命令
-
help 打印帮助信息
-
–host -h :设置注册中心ip
-
“–port”, “-p” :设置注册中心端口
-
“–storeMode”, “-m” :设置日志存储模式
-
“–serverNode”, “-n” :设置服务节点
-
“–seataEnv”, “-e” : seata环境设置
-
“–sessionStoreMode”, “-ssm” : 设置会话存储模式,主要有三种文件,数据库,redis
-
“–lockStoreMode”, “-lsm” : 设置锁信息的存储,主要有三种文件,数据库,redis
-
-
容器中
-
配置中心或者文件中
private void init(String[] args) { try { //从命令行中获取配置信息 getCommandParameters(args); //从环境(容器中)获取配置信息 getEnvParameters(); if (StringUtils.isNotBlank(seataEnv)) { System.setProperty(ENV_PROPERTY_KEY, seataEnv); } //从文件中获取配置信息 if (StringUtils.isBlank(storeMode)) { storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE); } if (StringUtils.isBlank(sessionStoreMode)) { sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode); } if (StringUtils.isBlank(lockStoreMode)) { lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode); } } catch (ParameterException e) { printError(e); } }
2. 初始化监控信息
MetricsManager.get().init();
默认是关闭的
3.创建netty用于服务端TC与客户端TM,RM通信
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
//3.创建netty用于服务端TC与客户端TM,RM通信
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
父抽象类 AbstractNettyRemotingServer 中设置 消息处理器
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
super(messageExecutor);
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
serverBootstrap.setChannelHandlers(new ServerHandler());
}
4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id
//4.初始化UUID生成器(雪花算法),用于生成全局事务与分支事务id
UUIDGenerator.init(parameterParser.getServerNode());
5.初始化SessionManager与LockManager,管理会话与锁信息
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
SessionManager有三个不同的实现,通过SPI机制加载
io.seata.server.storage.file.session.FileSessionManager
io.seata.server.storage.db.session.DataBaseSessionManager
io.seata.server.storage.redis.session.RedisSessionManager
resource下的META-INF文件夹中的service文件,根据要加载类的权限定类名找对应的文件,文件中存放着对应需要加载的类的权限定类型
io.seata.server.session.SessionHolder#init
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
} else if (StoreMode.FILE.equals(storeMode)) {
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_SESSION_STORE_FILE_DIR);
if (StringUtils.isBlank(sessionStorePath)) {
throw new StoreException("the {store.file.dir} is empty.");
}
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
} else if (StoreMode.REDIS.equals(storeMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
} else {
// unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
}
reload(storeMode);
}
EnhancedServiceLoader.load(SessionManager.class, xxxName);
通过SPI机制获取所有需要装配的类后,通过参数二最终选择某一个类
@LoadLevel(name = "db", scope = Scope.PROTOTYPE)
public class DataBaseSessionManager extends AbstractSessionManager
@LoadLevel(name = "file", scope = Scope.PROTOTYPE)
public class FileSessionManager extends AbstractSessionManager implements Reloadable {
@LoadLevel(name = "redis", scope = Scope.PROTOTYPE)
public class RedisSessionManager extends AbstractSessionManager
除了META-INF/services/ 路径,还会找 META-INF/seata/路径下
io.seata.common.loader.EnhancedServiceLoader.InnerEnhancedServiceLoader#findAllExtensionDefinition
LockManager同样也有三个实现类,也是通过SPI机制加载
io.seata.server.storage.db.lock.DataBaseLockManager
io.seata.server.storage.file.lock.FileLockManager
io.seata.server.storage.redis.lock.RedisLockManager
6.创建并初始化事务协调者 DefaultCoordinator 并监听事务消息并处理
创建DefaultCoordinator实列
实列 DefaultCoordinator最重要的就是给两个属性赋值
-
remotingServer
-
core
remotingServer是用来TC与TM,RM通信的,而core则是真正的事务协调者(TC)
core的coreMap属性将缓存4个实现类,对应seata的四种模式,XA,AT,TCC,SAGA,实现类同样也是通过SPI机制加载,并在完成加载后缓存到coreMap中
key为枚举类BranchType到值,value为SPI加载到对应类
io.seata.server.coordinator.DefaultCore#DefaultCore
DefaultCoordinator 初始化
初始化即启动5个周期线程
//周期重试回滚事务
retryRollbacking.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,
ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//周期重试提交事务
retryCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,
COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//周期异步提交事务
asyncCommitting.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,
ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//周期超时检测
timeoutCheck.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,
TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
//周期清理回滚日志
undoLogDelete.scheduleAtFixedRate(
() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
每个线程池的任务逻辑基本都是一样的,只不过针对不同的事务状态
protected void handleRetryRollbacking() {
//条件对象,只处理对应状态的事务会话
SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);
sessionCondition.setLazyLoadBranch(true);
//查找所有的全局会话 如果当前事务会话持久方式是DB,则从表中条件查找
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
//如果为空,表示当前没有事务,则直接返回
if (CollectionUtils.isEmpty(rollbackingSessions)) {
return;
}
long now = System.currentTimeMillis();
//遍历所有的事务会话
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
//如果是正在回滚中的或者已经死亡的事务会话,跳过
// prevent repeated rollback
if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)
&& !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'.
return;
}
//如果超时
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
//有锁则释放全局锁
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
rollbackingSession.clean();
}
//删除全局事务会话
// Prevent thread safety issues
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
SessionHelper.endRollbackFailed(rollbackingSession, true);
// rollback retry timeout event
MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);
//The function of this 'return' is 'continue'.
return;
}
rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//如果没有超时,执行回滚
core.doGlobalRollback(rollbackingSession, true);
} catch (TransactionException ex) {
LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
}
});
}
nettyRemotingServer 通信服务类设置handler
将创建的TC绑定到nettyRemotingServer
7.注册销毁事件
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
将会在Sping容器关闭的时候调用coordinator的destory方法
@Override
public void destroy() {
// 1. first shutdown timed task
retryRollbacking.shutdown();
retryCommitting.shutdown();
asyncCommitting.shutdown();
timeoutCheck.shutdown();
undoLogDelete.shutdown();
branchRemoveExecutor.shutdown();
try {
retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
// 2. second close netty flow
if (remotingServer instanceof NettyRemotingServer) {
((NettyRemotingServer) remotingServer).destroy();
}
// 3. third destroy SessionHolder
SessionHolder.destroy();
instance = null;
}
8.启动NettyRemotingServer,处理TM与RM消息
@Override
public void init() {
// registry processor
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
registerProcessor 方法主要是注册一系列消息处理器
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
调用 super.registerProcessor 方法 将 消息处理器与线程池封装成一个Pair,然后在放到map中,map的key为消息类型,value为Pair。
ServerOnRequestProcessor 的 transactionMessageHandler 成员变量为第6步创建的TC
super.init 方法则是启动netty
@Override
public void init() {
//启动超时检测
super.init();
//启动netty
serverBootstrap.start();
}
@Override
public void start() {
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
try {
this.serverBootstrap.bind(getListenPort()).sync();
XID.setPort(getListenPort());
LOGGER.info("Server started, service listen port: {}", getListenPort());
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
} catch (SocketException se) {
throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
} catch (Exception exx) {
throw new RuntimeException("Server start failed", exx);
}
}
在第3步时,创建并设置了双向通道消息处理器 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
super(messageExecutor);
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
serverBootstrap.setChannelHandlers(new ServerHandler());
}
ChannelDuplexHandler是Netty框架中的一个双向通道处理器,用于处理网络通信中的读写事件。它继承自ChannelInboundHandler和ChannelOutboundHandler,可以同时处理入站和出站事件。
当通道中有消息时,将调用io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler#channelRead方法
处理消息
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//根据消息的类型,获取Pair,找到消息处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
//处理消息
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
假设此次消息是TM开启全局事务的单条消息,则将交由ServerOnRequestProcessor处理,ServerOnRequestProcessor处理的消息类型如下
* RM:
* 1) {@link MergedWarpMessage}
* 2) {@link BranchRegisterRequest}
* 3) {@link BranchReportRequest}
* 4) {@link GlobalLockQueryRequest}
* TM:
* 1) {@link MergedWarpMessage}
* 2) {@link GlobalBeginRequest}
* 3) {@link GlobalCommitRequest}
* 4) {@link GlobalReportRequest}
* 5) {@link GlobalRollbackRequest}
* 6) {@link GlobalStatusRequest}
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
onRequestMessage(ctx, rpcMessage);
} else {
try {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
}
ctx.disconnect();
ctx.close();
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
}
}
}
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
} else {
try {
BatchLogHandler.INSTANCE.getLogQueue()
.put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:"
+ rpcContext.getTransactionServiceGroup());
} catch (InterruptedException e) {
LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
}
}
if (!(message instanceof AbstractMessage)) {
return;
}
//处理多条消息
// the batch send request message
if (message instanceof MergedWarpMessage) {
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
if (PARALLEL_REQUEST_HANDLE) {
CompletableFuture.runAsync(
() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();
List<CompletableFuture<Void>> completableFutures = null;
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.runAsync(() -> {
results.add(finalI, handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext));
}));
} else {
results.add(i,
handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
try {
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
} else {
//处理单条消息
// the single send request message
final AbstractMessage msg = (AbstractMessage) message;
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
最终将调用io.seata.server.coordinator.DefaultCoordinator#doGlobalBegin
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
//获取全局事务id放入response中
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
core.begin方法
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin();
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.lazyLoadBranch = lazyLoadBranch;
if (!lazyLoadBranch) {
this.branchSessions = new ArrayList<>();
}
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
this.xid = XID.generateXID(transactionId);
}
首先通过雪花算法生成事务id,然后调用 XID.generateXID(transactionId)
最终格式为 ip:端口号:事务id
public static String generateXID(long tranId) {
return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();
}
【大部分内容来源于https://saint.blog.csdn.net/article/details/126457129。推荐各位学习seata的朋友看一看】