前言
本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。
正文
我们知道Impala执行一条SQL的主要流程包括:
- 指定单节点执行计划,将SQL转换为一颗包含若干不同计划结点
PlanNode
的计划树PlanTree
; - 制定分布式执行计划,将单节点计划树拆分为若干片段
Fragment
,以便在分布式集群上分配调度; - 分配与调度执行计划片段到执行节点;
- 各执行结点根据
Fragment
创建若干执行实例Instance
,在一个Instance
中Fragment
的每个PlanNode
对应生成一个执行结点ExecNode
; - 各执行结点完成各自的任务(如扫描数据、聚合、排序等等),数据以行批
RowBatch
的形式在结点间传递; - 数据最终汇集到根结点作为查询结果返回给客户端,SQL执行完成;
在这一流程中,各个执行结点各司其职、逻辑解耦的设计使得Impala的开发与优化变得清晰与方便。在各类执行结点中,扫描结点ScanNode
作为一个大类包括了各种数据源的扫描结点派生,负责了各种数据源的扫描,是Impala中最重要的结点之一。本文将以执行结点中的Kudu扫描结点为例子介绍ScanNode
的主要结构和执行流程。
ScanNode的继承与派生关系
Impala支持多种数据源,每种数据都对应了一种扫描结点,这些扫描结点都派生自ScanNode
类,具体派生关系如图所示:
图中ExecNode
是所有执行结点的基类,主要定义了Prepare
、Open
、GetNext
和Close
四个接口,所有的执行结点都需要实现这些方法,完成准备、开启、获取下一批数据和关闭四种逻辑,整个执行树ExecTree
的开关与执行也正是由根结点到叶结点地调用这些方法。
图中ScanNode
作为所有扫描结点的基类,直接继承了ExecNode
并在其基础之上增加了ScanRange
、runtime filters和许多扫描性能相关的计时器、计数器,另外还有一个负责多线程扫描使用的内部类ScannerThreadState
。
如图所示,ScanNode
又进一步派生出四个类,分别对应了自定义数据源(DataSource)、HBase、Kudu和Hdfs。其中Kudu和Hdfs都支持了MT_DOP功能(Impala中提升查询并发度的功能,可以手动指定运行多个实例来提升性能),所以还包括了MT和非MT两个版本的扫描结点。
各个扫描结点为了完成对应数据源的扫描工作,可能还会包含各自的扫描器类,如KuduScanner
包括了连接Kudu、物化数据等逻辑。而HdfsScanner
更加复杂,根据数据储存格式又分为了文本格式扫描器HdfsTextScanner
、列存格式扫描器HdfsCoulumnarScanner
等,HdfsCoulumnarScanner
又进一步派生出了ORC格式、Parquet格式对应的扫描器。
由于ScanNode
的派生类众多,尤其是Impala主力支持的Hdfs下还有众多Scanner类,想要一一介绍则篇幅过长,而KuduScanNode
的相关代码量适中又包含了完整全面的相关逻辑,所以本文选择了Kudu的扫描结点作为例子来进行介绍。
Kudu扫描结点的基类KuduScanNodeBase
KuduScanNodeBase
继承了ScanNode
,是KuduScanNodeMT
和KuduScanNode
的基类,包括了两者共通的一些逻辑和成员,是比较简单的一个类,我们直接看其定义:
class KuduScanNodeBase : public ScanNode {
public:
KuduScanNodeBase(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
~KuduScanNodeBase();
virtual Status Prepare(RuntimeState* state) override;
virtual Status Open(RuntimeState* state) override;
// GetNext被定义为纯虚函数,需要KuduScanNode和KuduScanNodeMT具体实现,KuduScanNodeBase本身无法实例化。
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
override = 0;
// KuduScanNode支持利用统计信息对count(*)查询进行优化。
bool optimize_count_star() const { return count_star_slot_offset_ != -1; }
int count_star_slot_offset() const { return count_star_slot_offset_; }
protected:
virtual void DebugString(int indentation_level, std::stringstream* out) const override;
/// 返回扫描令牌的总数,扫描令牌是Kudu中类似ScanRange的对象,描述了Kudu表的一段连续物理位置。
int NumScanTokens() { return scan_tokens_.size(); }
/// 返回是否还有扫描令牌剩余,非线程安全的函数。
bool HasScanToken() { return (next_scan_token_idx_ < scan_tokens_.size()); }
/// 返回下一个扫描令牌,如果没有剩余的扫描令牌,则返回nullptr,非线程安全的函数。
const std::string* GetNextScanToken() {
if (!HasScanToken()) return nullptr;
const string* token = &scan_tokens_[next_scan_token_idx_++];
return token;
}
const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
private:
friend class KuduScanner;
/// 需要从Kudu表读取的数据的元组描述符在TupleDescriptorMap中的标识id.
const TupleId tuple_id_;
/// 需要从Kudu表读取的数据的元组描述符,主要包括若干槽位描述符SlotDescriptor。
const TupleDescriptor* tuple_desc_ = nullptr;
/// 指向KuduClient的指针,对象本身存储在QueryState中,并在KuduScanner和实例之间共享。
/// KuduClient是Kudu提供的C++ API之一,用于连接Kudu读取数据。
kudu::client::KuduClient* client_ = nullptr;
/// Kudu API中的Kudu表对象, 在KuduScanNode的多个KuduScanner之间共享。
kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
/// 本扫描结点需要处理的所有扫描令牌,以序列化后的形式储存,由KuduScanner反序列化并处理。
std::vector<std::string> scan_tokens_;
/// scan_tokens_中下一个待分配的令牌的索引。
int next_scan_token_idx_ = 0;
/// 如果启用了count(*)查询优化,该则值被设置为count(*)槽位在元组中的字节偏移量,否则为-1。
/// 设置此参数后,该扫描节点可以通过使用num rows统计中的数据快速填充元组来优化count(*)查询。
const int count_star_slot_offset_;
/// Kudu相关的一些性能计数器。
RuntimeProfile::Counter* kudu_round_trips_ = nullptr;
RuntimeProfile::Counter* kudu_remote_tokens_ = nullptr;
RuntimeProfile::Counter* kudu_client_time_ = nullptr;
static const std::string KUDU_ROUND_TRIPS;
static const std::string KUDU_REMOTE_TOKENS;
static const std::string KUDU_CLIENT_TIME;
kudu::client::KuduClient* kudu_client() { return client_; }
RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
RuntimeProfile::Counter* kudu_client_time() const { return kudu_client_time_; }
};
看完了KuduScanNodeBase
的定义,我们再看看其中几个关键函数的实现,首先是Prepare
函数,其负责了结点对象创建之后的准备工作,代码如下:
Status KuduScanNodeBase::Prepare(RuntimeState* state) {
// 首先调用基类ScanNode的Prepare函数,完成ScanNode通用的准备工作。
// 在ScanNode::Prepare()中又会调用ExecNode::Prepare()完成执行结点通用的准备工作。
// ExecNode::Prepare()中的准备工作包括创建一些内存追踪器、内存池、计数器和评估谓词的表达式求值器,
// 同时其还会调用其所有子结点的Prepare函数,不过本例ScanNode在ExecTree中都是作为叶子结点的,并无子结点。
// ScanNode::Prepare()中的准备工作则包括创建一些Scan特有的计数器计时器和准备runtime filters上下文对象并为其创建表达式求值器。
RETURN_IF_ERROR(ScanNode::Prepare(state));
// 以下是一些计数器和计时器的初始化
scan_ranges_complete_counter_ =
PROFILE_ScanRangesComplete.Instantiate(runtime_profile());
kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT);
kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT);
kudu_client_time_ = ADD_TIMER(runtime_profile(), KUDU_CLIENT_TIME);
// 从表描述符中获取到该扫描结点的元组描述符
DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
// 从TScanRangeParams初始化要处理的扫描令牌列表。
DCHECK(scan_range_params_ != NULL);
int num_remote_tokens = 0;
for (const TScanRangeParams& params: *scan_range_params_) {
if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens;
scan_tokens_.push_back(params.scan_range.kudu_scan_token);
}
COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);
return Status::OK();
}
然后是Open
函数,其负责了结点对象准备完毕之后的启动工作,代码如下:
Status KuduScanNodeBase::Open(RuntimeState* state) {
// 与Prepare函数一样,Open函数也会先调用基类的Open函数。
// ExecNode::Open()中会开启所有表达式求值器,
// ScanNode::Open()中会开启所有runtime filter的表达式求值器。
RETURN_IF_ERROR(ScanNode::Open(state));
// 检查查询是否被取消了,若是则直接返回。
RETURN_IF_CANCELLED(state);
// ExecNode::QueryMaintenance()会清理内存池,并检查查询状态是否正常,该函数应当定期调用。
RETURN_IF_ERROR(QueryMaintenance(state));
// 为总计时器开始计时,SCOPED_TIMER是范围计时器,创建时开始计时,退出作用域时停止计时并将计时累加到传入参数中。
SCOPED_TIMER(runtime_profile_->total_time_counter());
// 从元组描述符中拿到表描述符并静态转换为Kudu表描述符。
const KuduTableDescriptor* table_desc =
static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
// 从Kudu表描述符中拿到该表所在的Kudu地址,并以此创建Kudu客户端。
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
table_desc->kudu_master_addresses(), &client_));
// 设置最新观测到的Kudu时间戳。
uint64_t latest_ts = static_cast<uint64_t>(
max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
// 调用Kudu API开启kudu表。
KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
"Unable to open Kudu table");
runtime_profile_->AddInfoString("Table Name", table_desc->fully_qualified_name());
// 如果有runtime filters可用,则先等待所有runtime filters到达。
if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
return Status::OK();
}
如上文所述,KuduScanNodeBase
只是一个抽象了KuduScanNode
和KuduScanNodeMT
共通逻辑的抽象类,本身无法实例化,主要的作用就是减少重复代码,更为核心的执行逻辑GetNext
需要KuduScanNode
和KuduScanNodeMT
自行实现,我们接下来看KuduScanNodeMT
。
多实例版本的Kudu扫描结点KuduScanNodeMT
KuduScanNodeBase
有两个派生类,分别为KuduScanNode
和KuduScanNodeMT
,其中KuduScanNode
是更为常用的版本,其内部实现了多线程的扫描逻辑,而KuduScanNodeMT
则是为Impala的MT_DOP准备的版本。Impala为了更加充分地利用CPU和内存资源提升查询并发度而提供了MT_DOP这个Query Option,其允许用户手动指定一个并发度,Impala会在每个执行节点为每个Fragment创建指定个数的实例(实例数量还受别的因素制约,如ScanRange数量、Fragment类型),Impala3.4对MT_DOP的支持还比较有限,只有某些ScanNode进行了支持。
通过MT_DOP指定并发度后,Kudu扫描将使用KuduScanNodeMT
,每个相关实例包含一个KuduScanNodeMT
来实现指定并发度,所以KuduScanNodeMT
本身是单线程的工作模型,代码也比较简单,其定义如下:
class KuduScanNodeMt : public KuduScanNodeBase {
public:
KuduScanNodeMt(ObjectPool* pool, const ScanPlanNode& pnode, const DescriptorTbl& descs);
~KuduScanNodeMt();
virtual Status Open(RuntimeState* state) override;
// KuduScanNodeMt的核心函数,实现了通过KuduScanner处理token并返回行批RowBatch的逻辑。
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual void Close(RuntimeState* state) override;
// getExecutionModel函数覆写了ExecNode::getExecutionModel(),返回本结点的执行模型,DEBUG适用。
// 执行模型ExecutionModel是个枚举类型,反应了本结点的多线程模型,
// TASK_BASED指的是基于任务的多线程,另外还有HdfsScanNodeMT属于此类型。
virtual ExecutionModel getExecutionModel() const override { return TASK_BASED; }
private:
// 指向当前正在扫描的scan token的指针。
const std::string* scan_token_;
// KuduScanner的独占指针,KuduScanner实现了Kudu扫描的核心逻辑。
std::unique_ptr<KuduScanner> scanner_;
};
然后是KuduScanNodeMT
的几个关键方法:
Status KuduScanNodeMt::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
// 首先调用基类的Open函数,完成Kudu扫描的通用开启逻辑。
RETURN_IF_ERROR(KuduScanNodeBase::Open(state));
// 实例化一个KuduScanner并开启,KuduScanner实现了Kudu扫描的核心逻辑。
scanner_.reset(new KuduScanner(this, runtime_state_));
RETURN_IF_ERROR(scanner_->Open());
return Status::OK();
}
Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
DCHECK(row_batch != NULL);
// ExecDebugAction用来执行调试行为,可以在查询中引入人为设定的问题条件,用于内部调试和故障排除。
// 具体参见QueryOptions DEBUG_ACTION的说明。
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
// 若查询被取消则直接返回。
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
// eos标识了当前扫描结点是否扫描完成,当数据扫描完成或达到limit限制时,该值会被置为true。
*eos = false;
// 首先判断当前是否有正在处理的scan token,scan_token_为nullptr则进入循环。
bool scan_token_eos = scan_token_ == nullptr;
while (scan_token_eos) {
// 尝试获取下一个scan token。
scan_token_ = GetNextScanToken();
// 若获取到nullptr,说明本结点的scan token已经全部处理完毕。
if (scan_token_ == nullptr) {
// 扫描结束,停止所有定期更新的计数器、关闭scanner并设置eos为true然后返回。
runtime_profile_->StopPeriodicCounters();
scanner_->Close();
scanner_.reset();
*eos = true;
return Status::OK();
}
// 获取到下个scan token,调用KuduScanner::OpenNextScanToken进行开启,
// 如果该scan token没有需要扫描的行,则scan_token_eos会被置为true。
RETURN_IF_ERROR(scanner_->OpenNextScanToken(*scan_token_, &scan_token_eos));
}
// scanner_eos标识了当前KuduScanner正在处理的scan token是否完成。
bool scanner_eos = false;
// 调用KuduScanner::GetNext(),获取下一个RowBatch。
RETURN_IF_ERROR(scanner_->GetNext(row_batch, &scanner_eos));
// 如果scanner_eos为true,说明当前scan token扫描完成,可以为相关计数器+1并设置scan_token_为空指针。
if (scanner_eos) {
scan_ranges_complete_counter_->Add(1);
scan_token_ = nullptr;
}
// 让KuduScanner向Kudu服务发送Ping以保持活动状态。
scanner_->KeepKuduScannerAlive();
// ExecNode::CheckLimitAndTruncateRowBatchIfNeeded函数会检查该结点是达到了limit限制,
// 若达到限制则设置eos为true并截断RowBatch中超出limit的多余行,此外每次调用都还会更新相关计数器。
if (CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos)) {
// CheckLimitAndTruncateRowBatchIfNeeded返回true说明扫描达到了limit限制,
// 扫描结束,停止所有定期更新的计数器、关闭scanner。
scan_token_ = nullptr;
runtime_profile_->StopPeriodicCounters();
scanner_->Close();
scanner_.reset();
}
// rows_returned计数器记录了当前节点目前返回的行数,将其更新到profile的计数器中。
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
void KuduScanNodeMt::Close(RuntimeState* state) {
if (is_closed()) return;
SCOPED_TIMER(runtime_profile_->total_time_counter());
// 首先关闭KuduScanner,然后调用基类的Close函数,
// KuduScanNodeBase并没有覆写Close函数,实际调用的是ScanNode::Close(),
if (scanner_.get() != nullptr) scanner_->Close();
scanner_.reset();
KuduScanNodeBase::Close(state);
}
至此,KuduScanNodeMT
我们就分析完了,由于其是单线程工作模型,可以发现其逻辑还是比较简单的,而关键的扫描逻辑都被KuduScanner
实现了,后续文章我们继续分析多线程工作的KuduScanNode
和KuduScanner
。