文章目录
- 启动类 Server
- SessionHolder初始化
- DefaultCoordinator初始化
- 初始化NettyRemotingServer
启动类 Server
seata-server的入口类在Server类中,源码如下:
public class Server {
/**
* The entry point of application.
*
* @param args the input arguments
* @throws IOException the io exception
*/
public static void main(String[] args) throws IOException {
// 获取端口,默认是8091
int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
if (ContainerHelper.isRunningInContainer()) {
logger.info("The server is running in container.");
}
//参数解析器,用来解析启动的配置,包括file.conf和registry.conf
//Note that the parameter parser should always be the first line to execute.
//Because, here we need to parse the parameters needed for startup.
ParameterParser parameterParser = new ParameterParser(args);
//initialize the metrics
MetricsManager.get().init();
// 把从配置文件中读取到的storeMode写入SystemProperty中,方便其他类使用
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
// netty的线程池
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
// 创建NettyRemotingServer实例,主要就是创建一个NettyServerBootstrap,负责与TM,RM进行通信
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
// 监听的端口,8091
nettyRemotingServer.setListenPort(parameterParser.getPort());
// 初始化UUIDGenerator,UUID生成器,基于雪花算法,用于生成全局事务id,分支事务id
// 多个Server实例配置不同的ServerNode,保证id的唯一性
UUIDGenerator.init(parameterParser.getServerNode());
// SessionHodler负责事务日志(状态)的持久化存储,
// 根据不同的存储模式来创建
SessionHolder.init(parameterParser.getStoreMode());
// 创建DefaultCoordinator实例并初始化,DefaultCoordinator是TC的核心事务逻辑处理类
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
// 将coordinator设置为事务消息处理器,处理netty接收到的事务请求
nettyRemotingServer.setHandler(coordinator);
// register ShutdownHook
ShutdownHook.getInstance().addDisposable(coordinator);
ShutdownHook.getInstance().addDisposable(nettyRemotingServer);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(nettyRemotingServer.getListenPort());
try {
// 初始化netty,开始监听端口并阻塞在这里
nettyRemotingServer.init();
} catch (Throwable e) {
logger.error("nettyServer init error:{}", e.getMessage(), e);
System.exit(-1);
}
System.exit(0);
}
}
在阅读源码的时候,有些源码是要细看的,但是有些源码可以大致猜测一下它的作用,就直接略过去了,抓住真正的重点去看。
SessionHolder初始化
SessionHolder负责Session的持久化,一个session对象代表一个事务。SessionHolder包含了4个session管理器,用来操作session。
// 用于获取所有的session,以及session的创建,更新和删除
private static SessionManager ROOT_SESSION_MANAGER;
// 用于获取,更新所有异步commit的session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于获取,更新所有需要重试commit的session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于获取,更新所有需要重试rollback的session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
初始化方法init
public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
}
StoreMode storeMode = StoreMode.get(mode);
// 数据库存储模式,一般也是推荐用数据库
if (StoreMode.DB.equals(storeMode)) {p
// SPI方式加载SessionManager
// 这里4个SessionManager都是DataBaseSessionManager类的4个不同实例
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME}); // async.commit.data 表示是用来处理异步提交
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME}); // retry.commit.data 表示是用来处理重试提交的
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); // retry.rollback.data 表示是用来处理重试回滚的
} else if (StoreMode.FILE.equals(storeMode)) {
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_SESSION_STORE_FILE_DIR);
if (StringUtils.isBlank(sessionStorePath)) {
throw new StoreException("the {store.file.dir} is empty.");
}
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Class[] {String.class, String.class}, new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Class[] {String.class, String.class}, new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
new Class[] {String.class, String.class}, new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});
} else if (StoreMode.REDIS.equals(storeMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else {
// unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
}
reload(storeMode);
}
DefaultCoordinator初始化
DefaultCoordinator是TC的核心事务逻辑处理类,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。
public DefaultCoordinator(RemotingServer remotingServer) {
this.remotingServer = remotingServer;
this.core = new DefaultCore(remotingServer);
}
在DefaultCoordinator里还创建了一个DefaultCore,该类是默认的 TC 事务操作实现,DefaultCoordinator的开启、提交、回滚全局事务,注册、提交、回滚分支事务都是委托给这个类。
public DefaultCore(RemotingServer remotingServer) {
List<AbstractCore> allCore = EnhancedServiceLoader.loadAll(AbstractCore.class,
new Class[]{RemotingServer.class}, new Object[]{remotingServer});
if (CollectionUtils.isNotEmpty(allCore)) {
for (AbstractCore core : allCore) {
coreMap.put(core.getHandleBranchType(), core);
}
}
}
在DefaultCore构造方法里又会去通过SPI方式加载AbstractCore的实现类,类名在META-INF.services/io.seata.server.coordinator.AbstractCore文件里。
将这4个实例缓存在DefaultCore中的coreMap里,分别是AT,TCC,SAGA和XA模式下的事务处理类。
然后调用DefaultCoordinator的初始化方法init
public void init() {
// 处理处于回滚状态可重试的事务的定时任务
retryRollbacking.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryRollbackingLock();
if (lock) {
try {
handleRetryRollbacking();
} catch (Exception e) {
LOGGER.info("Exception retry rollbacking ... ", e);
} finally {
SessionHolder.unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 处理二阶段可以重试提交的状态可重试的事务的定时任务
retryCommitting.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.retryCommittingLock();
if (lock) {
try {
handleRetryCommitting();
} catch (Exception e) {
LOGGER.info("Exception retry committing ... ", e);
} finally {
SessionHolder.unRetryCommittingLock();
}
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 处理二阶段异步提交的事务的定时任务
asyncCommitting.scheduleAtFixedRate(() -> {
// 默认都是true
boolean lock = SessionHolder.asyncCommittingLock();
if (lock) {
try {
// 处理异步提交
handleAsyncCommitting();
} catch (Exception e) {
LOGGER.info("Exception async committing ... ", e);
} finally {
SessionHolder.unAsyncCommittingLock();
}
}
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 检查事务的第一阶段已经超时的事务,设置事务状态为TimeoutRollbacking,
// 该事务会由其他定时任务执行回滚操作
timeoutCheck.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.txTimeoutCheckLock();
if (lock) {
try {
timeoutCheck();
} catch (Exception e) {
LOGGER.info("Exception timeout checking ... ", e);
} finally {
SessionHolder.unTxTimeoutCheckLock();
}
}
}, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);
// 根据unlog的保存天数调用RM删除unlog
undoLogDelete.scheduleAtFixedRate(() -> {
boolean lock = SessionHolder.undoLogDeleteLock();
if (lock) {
try {
undoLogDelete();
} catch (Exception e) {
LOGGER.info("Exception undoLog deleting ... ", e);
} finally {
SessionHolder.unUndoLogDeleteLock();
}
}
}, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}
主要就是创建了5个定时任务,主要用于事务的重试机制,因为分布式环境的不稳定性会造成事务处于中间状态,所以要通过不断的重试机制来实现事务的最终一致性。这里面还有一个处理二阶段异步提交的事务的定时任务。
初始化NettyRemotingServer
在上面创建了NettyRemotingServer,所以在最后需要进行初始化,开始监听端口并阻塞在这里。
@Override
public void init() {
// 注册与Client通信的Processor
registerProcessor();
// 再调用父类的init
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
NettyRemotingServer初始化时主要做了两件事:
1、注册与Client通信的Processor,每个事务请求类型都对应一个Processor。当NettyRemotingServer接收到请求后,从注册的Processor列表中选出一个适合的Processor进行处理。
private void registerProcessor() {
// 1. 注册核心的ServerOnRequestProcessor,即与事务处理相关的Processor,
// 如:全局事务开始、提交,分支事务注册、反馈当前状态等。
// getHandler就是DefaultCoordinator
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. 注册ResponseProcessor,ResponseProcessor用于处理当Server端主动发起请求时,Client端回复的消息
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. Client端发起RM注册请求时对应的Processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. Client端发起TM注册请求时对应的Processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. Client端发送心跳请求时对应的Processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
2、调用父类AbstractNettyRemotingServer去启动Netty服务端
@Override
public void init() {
super.init();
// 启动Netty
serverBootstrap.start();
}
继续调用父类AbstractNettyRemoting方法,创建一个定时任务。
public void init() {
// 用于定时清除超时的请求,3s执行一次
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
if (entry.getValue().isTimeout()) {
futures.remove(entry.getKey());
entry.getValue().setResultMessage(null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}