FeServer feServer = new FeServer(Config.rpc_port);
feServer.start();
FeServer(Doris frontend thrift server)职责是负责FE和BE之间通信。如下即为初始化中关于FeServer类的构造函数和start函数的具体代码。其start函数流程和构建thrift server行为一致,这里不介绍,仅说明其提供的接口。
public class FeServer {
private static final Logger LOG = LogManager.getLogger(FeServer.class);
private int port; private ThriftServer server;
public FeServer(int port) { this.port = port; }
public void start() throws IOException {
FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
Logger feServiceLogger = LogManager.getLogger(FrontendServiceImpl.class);
FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(), FrontendServiceImpl.class.getInterfaces(),
(proxy, method, args) -> {
long begin = System.currentTimeMillis();
String name = method.getName();
if (MetricRepo.isInit) { MetricRepo.THRIFT_COUNTER_RPC_ALL.getOrAdd(name).increase(1L); }
feServiceLogger.debug("receive request for {}", name);
Object r = null;
try { r = method.invoke(service, args);
} catch (Throwable t) {
feServiceLogger.warn("errors while process request for {}", name, t);
// If exception occurs, do not deal it, just keep as the previous
throw t;
} finally {
feServiceLogger.debug("finish process request for {}", name);
if (MetricRepo.isInit) {
long end = System.currentTimeMillis();
MetricRepo.THRIFT_COUNTER_RPC_LATENCY.getOrAdd(name).increase(end - begin);
}
}
return r;
});
// setup frontend server
TProcessor tprocessor = new FrontendService.Processor<>(instance);
server = new ThriftServer(port, tprocessor);
server.start();
LOG.info("thrift server started.");
}
}
Frontend service用于服务通过thrift协议过来的所有请求。其构造函数主要关注exeEnv和MasterImple两个类。
// Frontend service used to serve all request for this frontend through thrift protocol
public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private MasterImpl masterImpl;
private ExecuteEnv exeEnv;
// key is txn id,value is index of plan fragment instance, it's used by multi table request plan
private ConcurrentHashMap<Long, Integer> multiTableFragmentInstanceIdIndexMap =
new ConcurrentHashMap<>(64);
public FrontendServiceImpl(ExecuteEnv exeEnv) {
masterImpl = new MasterImpl(); this.exeEnv = exeEnv;
}
在 FE 端与 BE 端均存在一个任务 Queue,如上图所示,从当前版本来看 thrift 实现的与 BE 之间的交互主要是用 AgentClient 来承载,而查看代码仅用于 snapshot 的管理过程,其他的并非通过该接口实现。注 这里存在疑问 gRpc 和 thrift 接口混着用的目的,不太明确
public TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(TConfirmUnusedRemoteFilesRequest request)
public TGetDbsResult getDbNames(TGetDbsParams params) throws TException
private static ColumnDef initColumnfromThrift(TColumnDesc tColumnDesc, String comment)
public TAddColumnsResult addColumns(TAddColumnsRequest request) throws TException
public TGetTablesResult getTableNames(TGetTablesParams params) throws TException
public TListTableStatusResult listTableStatus(TGetTablesParams params) throws TException
public TListPrivilegesResult listTablePrivilegeStatus(TGetTablesParams params) throws TException
public TListPrivilegesResult listSchemaPrivilegeStatus(TGetTablesParams params) throws TException
public TListPrivilegesResult listUserPrivilegeStatus(TGetTablesParams params) throws TException
public TDescribeTableResult describeTable(TDescribeTableParams params) throws TExceptionpublic
TDescribeTablesResult describeTables(TDescribeTablesParams params) throws TException
public TFetchSchemaTableDataResult fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException
public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException
public TQueryStatsResult getQueryStats(TGetQueryStatsRequest request) throws TException
public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfosRequest request)
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException
public TShowVariableResult showVariables(TShowVariableRequest params) throws TException
public TMasterOpResult forward(TMasterOpRequest params) throws TException
public TMySqlLoadAcquireTokenResult acquireToken() throws TException
public TCheckAuthResult checkAuth(TCheckAuthRequest request) throws TException
public TFeResult updateExportTaskStatus(TUpdateExportTaskStatusRequest request) throws TException
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException
public TMasterResult finishTask(TFinishTaskRequest request) throws TException
public TMasterResult report(TReportRequest request) throws TException
public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request)
public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request)
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException
public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException
public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) throws TException
public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws TException
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException
public TCommitTxnResult commitTxn(TCommitTxnRequest request) throws TException
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException
public TRollbackTxnResult rollbackTxn(TRollbackTxnRequest request) throws TException
public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException
public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException
public TGetBinlogResult getBinlog(TGetBinlogRequest request) throws TException
public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException
public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException
public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException
public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException
public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) throws TException