begin;
create database db1;
ERROR 1105 (HY000): TException, msg: org.apache.thrift.TException: This is in a transaction, only insert, commit, rollback is acceptable
从上述报错可以看出begin、commit、rollback等操作只有和insert操作结合使用。从上述可以猜测Doris数据库的事务性仅限于此。如下为全局事务支持的相关插入数据的操作。
FRONTEND old dppload, mini load, insert stmt(not straming type)
BACKEND_STRAMING streaming load
INSERT_STREAMING insert stmt (streaming type), update stmt
ROUTINE_LOAD_TASK routine load task
BATCH_LOAD_JOB load job v2 for broker load
全局事务状态机
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java Env构造函数中有如下代码: this.globalTransactionMgr = new GlobalTransactionMgr(this)
。Transaction Manager类主要响应如下三种事务操作:1. begin 2. commit 3.abort 。从如下成员可以看出,GlobalTransactionMgr针对系统中的每个数据库有其对应的DatabaseTransactionMgr,实现针对不同数据库进行分散事务管理。而TransactionIdGenerator就是其事务号发生器,属于中心化全局序发生器的统一时钟类型。该类提供了如下事务操作API。
beginTransaction 最终还是调用对应数据库的DatabaseTransactionMgr的beginTransaction函数。其调用方无不外乎如下几类:通过thirft协议服务端对外(BE)提供的开启事务的API;NativeInsertStmt语句执行时需要开启事务;DeleteJob执行时需要开启事务。BrokerLoadJob、SparkLoadJob、RoutineLoadTask、CanalSyncChannel任务执行时需要开启事务;对新版优化器Nereids提供的wrappered事务开启接口;StmtExecutor类的beginTxn成员函数,由函数executeForTxn在first time begin txn时调用【handleInsertStmt->executeForTxn->beginTxn】。
public class FrontendServiceImpl --> loadTxnBeginImpl --> Env.getCurrentGlobalTransactionMgr().beginTransaction(
--> beginTxnImpl --> Env.getCurrentGlobalTransactionMgr().beginTransaction(
public class NativeInsertStmt extends InsertStmt --> public void analyze(Analyzer analyzer) --> transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
public class DeleteHandler implements Writable --> process --> transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction
public class BrokerLoadJob extends BulkLoadJob --> beginTxn --> transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction
public class SparkLoadJob extends BulkLoadJob --> beginTxn --> transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction
public abstract class RoutineLoadTaskInfo --> beginTxn --> transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction
public class CanalSyncChannel extends SyncChannel --> beginTxn --> txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction
public class Transaction [transaction wrapper for Nereids] --> Transaction --> Env.getCurrentGlobalTransactionMgr().beginTransaction
public calss StmtExecutor --> beginTxn --> txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction
commitTransaction 最终还是调用对应数据库的DatabaseTransactionMgr的commitTransaction函数。主要提供给BrokerLoadJob和SparkLoadJob进行事务的提交操作。
public class BrokerLoadJob extends BulkLoadJob --> createLoadingTask --> Env.getCurrentGlobalTransactionMgr().commitTransaction
public class SparkLoadJob extends BulkLoadJob --> tryCommitJob --> Env.getCurrentGlobalTransactionMgr().commitTransaction
abortTransaction 最终还是调用对应数据库的DatabaseTransactionMgr的abortTransaction函数。和beginTransaction类似的调用栈流程,这里就不具体分析了。
public class InsertIntoTableCommand --> run --> Env.getCurrentGlobalTransactionMgr().abortTransaction
public class DeleteHandler implements Writable --> cancelJob --> globalTransactionMgr.abortTransaction
public abstract class LoadJob extends --> unprotectedExecuteCancel -->Env.getCurrentGlobalTransactionMgr().abortTransaction
public class CancelLoadAction extends RestBaseController --> execute --> Env.getCurrentGlobalTransactionMgr().abortTransaction
public class Transaction [transaction wrapper for Nereids] --> executeInsertIntoTableCommand --> Env.getCurrentGlobalTransactionMgr().abortTransaction
public calss StmtExecutor --> executeByLegacy --> Env.getCurrentGlobalTransactionMgr().abortTransaction
public class FrontendServiceImpl --> loadTxnRollbackImpl --> Env.getCurrentGlobalTransactionMgr().abortTransaction
--> rollbackTxnImpl --> Env.getCurrentGlobalTransactionMgr().abortTransaction
commitAndPublishTransaction 其和上述流程不同,首先需要先调用commitTransaction函数,然后需要调用dbTransactionMgr的waitForTransactionFinished等待数据可见,也就是Publish完成。
public class DeleteHandler implements Writable --> unprotectedCommitJob --> globalTransactionMgr.commitAndPublishTransaction
public class Transaction [transaction wrapper for Nereids] --> executeInsertIntoTableCommand --> globalTransactionMgr.commitAndPublishTransaction
public calss StmtExecutor --> handleInsertStmt --> Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction
public class FrontendServiceImpl --> loadTxnCommitImpl --> Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction
--> commitTxnImpl --> Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction
preCommitTransaction2PC 最终还是调用对应数据库的DatabaseTransactionMgr的preCommitTransaction2PC函数。该函数只用于thirft接口对外提供。
public class FrontendServiceImpl --> loadTxnPreCommitImpl --> Env.getCurrentGlobalTransactionMgr().preCommitTransaction2PC
commitTransaction2PC 最终还是调用对应数据库的DatabaseTransactionMgr的CommitTransaction2PC函数。该函数只用于thirft接口对外提供。
public class FrontendServiceImpl --> loadTxn2PCImpl --> Env.getCurrentGlobalTransactionMgr().commitTransaction2PC
abortTransaction2PC 最终还是调用对应数据库的DatabaseTransactionMgr的abortTransaction2PC函数。该函数只用于thirft接口对外提供。
public class FrontendServiceImpl --> loadTxn2PCImpl --> Env.getCurrentGlobalTransactionMgr().abortTransaction2PC
fe/fe-core/src/main/java/org/apache/doris/transaction/PushlishVersionDaemon.java publishVersion() --> finishTransaction
FrontendServiceImpl Txn Op
FE TXN Service
FE作为服务端,如上代码分析所示,下述thirft接口包含了GlobalTransactionMgr提供的事务API操作。
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException
public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException
针对loadTxnBegin接口,主要是由FE自己调用,从下面的代码可以看出如果本FE是master者使用GlobalTransactionMgr提供的beginTransaction函数;否则需要通过MasterTxnExecutor类向真正的master FE转发beginTxn请求。其还有一处由BE进行调用,将在下节分析。
public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) throws TException
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException
public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws TException
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException
public TCommitTxnResult commitTxn(TCommitTxnRequest request) throws TException
public TRollbackTxnResult rollbackTxn(TRollbackTxnRequest request) throws TException
public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException
waitingTxnStatus接口其实就是调用GlobalTransactionMgr类的public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request)
。和loadTxnBegin类似,如果本FE就是Master就使用GlobalTransactionMgr类的getWaitingTxnStatus,否则使用MasterTxnExecutor类向FE Master进行对应接口请求。
BE TXN Client
BE作为客户端通过thrift协议向FE发送TXN请求。ClientConnection类用于A scoped client connection to help manage clients from a client cache,也就是各种类型的client的一层封装,用于协助client cache对client进行管理,而FrontendServiceClient才是真正用于向FE server发送请求的客户端。
针对FE提供的loadTxnBegin接口,BE由StreamLoadExecutor的begin_txn函数进行执行。由stream load的工作原理决定,coordicate be需要向FE提交begin txn请求。
public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) throws TException
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException
loadTxnPreCommit和loadTxnCommit共用一个请求体,也都是由stream_load_executor类提供的相应函数请求。
public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws TException接口同样也是由stream_load_executor类的operate_txn_2pc函数请求。
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException 接口同样也是由stream_load_executor类的rollback_txn函数请求。
TxnManager
be/src/olap/txn_manager.h中的TxnManager类用于txn manager is used to manage mapping between tablet and txns。