7. 网络请求
7.1 TransactionManager
事务管理器,在客户端主要用于发起事务请求、提交事务、回滚事务请求等,用于跟 TC 进行通信的类,其中获取当前接口的实现类是通过 TransactionManagerHolder 进行获取,然后通过 SPI 接口获取到默认实现类,这里的默认实现类是 DefaultTransactionManager
public interface TransactionManager {
/**
* 开启全局事务
*
* @param applicationId 应用id,指明当前事务所属的应用
* @param transactionServiceGroup 事务服务的分组
* @param name 全局事务的名称
* @param timeout 全局事务的超时时间
*/
String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException;
/**
* 根据指定的xid进行事务的提交
*/
GlobalStatus commit(String xid) throws TransactionException;
/**
* 根据xid进行事务回滚
*/
GlobalStatus rollback(String xid) throws TransactionException;
/**
* 获取到当前事务的状态
*/
GlobalStatus getStatus(String xid) throws TransactionException;
/**
* 上报对应的全局事务的状态
*/
GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}
7.2 RMClient
RM客户端实例化工具类,用于创建 RM 客户端, GlobalTransactionScanner.initClient() 方法,在bean对象创建时进行初始化,与之相同的还有一个 TMClient 逻辑都一样
public class RMClient {
public static void init(String applicationId, String transactionServiceGroup) {
//通过单例模式创建一个Netty的 RM客户端端
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
//设置默认的资源管理器
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
/**
* 设置事务方法的处理器,默认 DefaultRMHandler,其中根据类型选择对于模式的处理器
* RMHandlerAT
* RMHandlerSaga
* RMHandlerTCC
* RMHandlerXA
*/
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//初始化时注册对于方法类型的处理器
rmNettyRemotingClient.init();
}
}
7.3 RemotingClient
远程调用客户端,用于 Client 与 Server 的通信,下面是依赖结构图,目前客户端的实现就以下两个:(TM用于一阶段的通信,RM用于二阶段接收TC对事务的处理);
- RmNettyRemotingClient:资源管理器客户端,管理分支事务处理的资源,驱动分支事务提交或回滚,与TC进行通信以注册分支事务和报告分支事务的状态
- TmNettyRemotiongClient:事务管理器,定义全局事务的范围,开始全局事务、提交或回滚全局事务
7.3.1 TmNettyRemotiongClient
采用了单例模式,底层使用 Netty 作为通讯框架,TM 主要用于发送请求然后处理发送请求过后的数据
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
//配置消费的线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
//创建TM客户端
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
init()
初始化方法,在上面 GlobalTransactional.initClient() 中进行调用,注册对应消息类型的处理器
public void init() {
// 注册消息类型的处理器
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
getClientChannelManager().reconnect(transactionServiceGroup);
}
}
}
registerProcessor()
TM 注册的消息类型处理是以下几种:
- MessageType.TYPE_SEATA_MERGE_RESULT:合并请求的结果()
- MessageType.TYPE_GLOBAL_BEGIN_RESULT:全局事务开启的结果处理()
- MessageType.TYPE_GLOBAL_COMMIT_RESULT:提交事务的结果处理
- MessageType.TYPE_GLOBAL_REPORT_RESULT:重新上报的结果处理
- MessageType.TYPE_GLOBAL_ROLLBACK_RESULT:回滚的结果处理
- MessageType.TYPE_GLOBAL_STATUS_RESULT:状态的结果处理
- MessageType.TYPE_REG_CLT_RESULT
- MessageType.TYPE_BATCH_RESULT_MSG:批量发送的结果
- MessageType.TYPE_HEARTBEAT_MSG:心跳消息的处理
private void registerProcessor() {
// 1.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
..................省略代码
}
7.3.2 RmNettyRemotiongClient
主要用于处理 TC 端主动下发的数据监听,用于 二阶段处理,初始化方法跟 TM 一样,只是注册的处理器不一样
registerProcessor()
RM 管理主要用于处理来自服务端下发的请求,主要注册的类型有以下:
- MessageType.TYPE_BRANCH_COMMIT:分支提交(RmBranchCommitProcessor)
- MessageType.TYPE_BRANCH_ROLLBACK:分支回滚(RmBranchRollbackProcessor)
- MessageType.TYPE_RM_DELETE_UNDOLOG:删除undolog(RmUndoLogProcessor)
- MessageType.TYPE_SEATA_MERGE_RESULT:合并结果(ClientOnResponseProcessor)
- MessageType.TYPE_BRANCH_REGISTER_RESULT:分支注册结果处理
- MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT:状态报告结果
- MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT:全局锁查询结果处理
- MessageType.TYPE_REG_RM_RESULT
- MessageType.TYPE_BATCH_RESULT_MSG:批量发送结果信息
- MessageType.TYPE_HEARTBEAT_MSG:心跳结果(ClientHeartbeatProcessor)
private void registerProcessor() {
// 1.registry rm client handle branch commit processor
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.registry rm client handle branch rollback processor
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
...........省略相同代码
}
TM、RM 都是采用 netty 进行通信,可以在两个对象的 构造方法 中看到设置的最终 handler 对象是 io.seata.core.rpc.netty.AbstractNettyRemotingClient.ClientHandler 对象;
AbstractNettyRemoting.processMessage() 中获取的又是根据消息类型区分的 RemotingProcessor 处理器,就是上面注册的处理器
class ClientHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
//调用 io.seata.core.rpc.netty.AbstractNettyRemoting.processMessage 的方法
processMessage(ctx, (RpcMessage) msg);
}
}
7.3.3 RemotingProcessor
- ClientHeartbeatProcessor:客户端心跳处理
- ClientOnResponseProcessor:客户端处理 TC 的回复消息
- ServerHeartbeatProcessor:服务端心跳处理
- ServerOnRequestProcessor:服务端用于处理客户端发送的分支注册、分支状态上报、全局事务开启、全局锁查询、全局事务回滚、全局事务状态的请求
- ServerOnResponseProcessor:服务端处理客户端发送的分支提交结果、分支回滚结果的处理器
- RmBranchCommitProcessor:分支提交的处理器
- RmBranchRollbackProcessor:分支回滚处理器
- RmUndoLogProcessor:undolog删除处理器
7.4 MessageTypeAware(请求实体)
请求实体的接口,就只定义了一个方法,用于请求的类型
public interface MessageTypeAware {
/**
* 获取到请求类型的code码
*/
short getTypeCode();
}
7.4.1 AbstractTransactionRequestToTC
对于客户端来说,请求体就是 TM --> TC ,seata中服务端的实体处理结构就是下面这样,抽象出了一个 AbstractTransactionRequestToTC 的抽象类,(tm发起的请求实体,tc就需要转换成对应的实体处理)
public abstract class AbstractTransactionRequestToTC extends AbstractTransactionRequest {
/**
* TC入栈处理器
*/
protected TCInboundHandler handler;
/**
* 设置TC的入栈处理器
*
* @param handler the handler
*/
public void setTCInboundHandler(TCInboundHandler handler) {
this.handler = handler;
}
}
7.4.2 AbstractTransactionRequestToRM
对于TC来说,给RM发送请求的实体对象是 AbstractTransactionRequestToRM,那么 RM就需要转换成 AbstractTransactionRequestToRM 下面的对于类型
public abstract class AbstractTransactionRequestToRM extends AbstractTransactionRequest {
/**
* 设置RM入栈处理器
*/
protected RMInboundHandler handler;
public void setRMInboundMessageHandler(RMInboundHandler handler) {
this.handler = handler;
}
}
7.4.3 RMInboundHandler
public interface RMInboundHandler {
/**
* 处理分支提交的请求
*/
BranchCommitResponse handle(BranchCommitRequest request);
/**
* 处理分支回滚的请求
*/
BranchRollbackResponse handle(BranchRollbackRequest request);
/**
* 处理undolog删除日志的请求
*/
void handle(UndoLogDeleteRequest request);
}
7.4.4 TCInboundHandler
public interface TCInboundHandler {
/**
* 处理开启全局事务的请求
*/
GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
/**
* 处理全局事务提交的请求
*/
GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
/**
* 处理事务回滚的请求
*/
GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
/**
* 处理分支注册的请求
*/
BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
/**
* 处理分支状态上报的请求
*/
BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
/**
* 查询全局锁
*/
GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
/**
* 全局事务状态查询
*/
GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
/**
* 上报事务状态该的请求
*/
GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}
上面的两个抽象类中,为什么要在实体类中定义两个 handler 呢?
让我一起看看 seata 的实体设置,这样的用意是什么。
例如:下面这个 分支提交请求 ,其中覆写了 handle() ,而这个 hander 的类型是上面两种 TCInboundHandler、RMInboundHandler 两个接口,两个接口给每一个请求方法都定义了一个方法,参数不同而已,也就是说,通过下面这种方法可以根据指定的参数调用到对应的处理方法,只需要设置定义的 handler 即可
public class BranchCommitRequest extends AbstractBranchEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_BRANCH_COMMIT;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this);
}
}
io.seata.rm.AbstractRMHandler#onRequest :这里的调用路径就是 netty 收到 TC 的分支提交请求,然后根据类型找到 RmBranchCommitProcessor 然后在 process() 中 调用 DefaultRMHandler.onRequest() 方法
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToRM)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
//客户端设置自己为handler,就可以根据对应的请求类型调用到指定的方法
transactionRequest.setRMInboundMessageHandler(this);
return transactionRequest.handle(context);
}