一. DGMaterializedIterator::GetNextPackrow
int DimensionGroupMaterialized::DGMaterializedIterator::GetNextPackrow(int dim, int ahead) { MEASURE_FET("DGMaterializedIterator::GetNextPackrow(int dim, int ahead)"); if (ahead == 0) return GetCurPackrow(dim); IndexTable *cur_t = t[dim]; if (cur_t == NULL) return -1; uint64_t end_block = cur_t->EndOfCurrentBlock(cur_pos); if (next_pack[dim] >= no_obj || uint64_t(next_pack[dim]) >= end_block) return -1; uint64_t ahead_pos = 0; // cout << "dim " << dim << ", " << next_pack[dim] << " -> " << // ahead1[dim] << " " << // ahead2[dim] << " " << ahead3[dim] << " (" << ahead << ")" << endl; if (ahead == 1) ahead_pos = t[dim]->Get64InsideBlock(next_pack[dim]); else if (ahead == 2 && ahead1[dim] != -1) ahead_pos = t[dim]->Get64InsideBlock(ahead1[dim]); else if (ahead == 3 && ahead2[dim] != -1) ahead_pos = t[dim]->Get64InsideBlock(ahead2[dim]); else if (ahead == 4 && ahead3[dim] != -1) ahead_pos = t[dim]->Get64InsideBlock(ahead3[dim]); if (ahead_pos == 0) return -1; return int((ahead_pos - 1) >> p_power); return -1; }
int64_t *next_pack; // beginning of the next pack (or no_obj if there is no // other pack) int64_t *ahead1, *ahead2, *ahead3; // beginning of the three next packs after next_pack, or // -1 if not determined properly
- 直接将内部对于pack的预读的实现细节暴漏出去,接口的调用方必须知道DGMaterializedIterator内部的处理
- DGMaterializedIterator类的实现与调用方的逻辑互相耦合,调用方必须了解DGMaterializedIterator的实现才能使用该类的功能、
- 没有更高层次的抽象,只有在iterator级别的封装使用,没有对类的功能和目的做思考,导致业务逻辑散布各个类的实现细节,对于维护和扩展是个灾难,而且出现问题难以快速定位
- 在做大的功能模块的架构的前,最基础的就是把一个函数写好,把一个类给设计好,细节做好,才能谈宏观的,不存在连个函数写的各种逻辑漏洞,类设计的稀烂却能做好功能设计的,这个类设计的真烂的出奇
- 先设计好每个功能的划分,类承载哪些功能和起到的作用,在恰到好处的抽象层次上解耦业务逻辑
- 对于类,建议根据面向对象的原则,去设计类和类间的交互关系
二. AggregatePackrow使用magic number作为错误码
int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if (sender) { sender->SetAffectRows(gbw.NumOfGroups()); } if (grouping_result == 2) throw common::KilledException(); if (grouping_result != 5) packrows_found++; // for statistics if (grouping_result == 1) break; // end of the aggregation
int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) { int64_t packrow_length = mit->GetPackSizeLeft(); if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) { mit->NextPackrow(); return 5; }
- 使用magic number无法直接从使用角度理解其出错的语义
- 数字无法被有意义的搜索,在定位问题时,无法根据错误码快速定位代码
- 使用枚举定义出错误码,返回枚举值
三. 逻辑不自洽,在自身的逻辑中崩溃
std::unique_ptr<GroupByWrapper> gbw_ptr(new GroupByWrapper(*gb_sharding)); gbw_ptr->FillDimsUsed(dims); gbw_ptr->SetDistinctTuples(mit->NumOfTuples()); if (!gbw_ptr->IsOnePass()) gbw_ptr->InitTupleLeft(mit->NumOfTuples());
GroupByWrapper::GroupByWrapper(const GroupByWrapper &sec) : distinct_watch(sec.p_power), m_conn(sec.m_conn), gt(sec.gt) { p_power = sec.p_power; attrs_size = sec.attrs_size; just_distinct = sec.just_distinct; virt_col = new vcolumn::VirtualColumn *[attrs_size]; input_mode = new GBInputMode[attrs_size]; is_lookup = new bool[attrs_size]; attr_mapping = new int[attrs_size]; // output attr[j] <-> gt group[attr_mapping[j]] dist_vals = new int64_t[attrs_size]; for (int i = 0; i < attrs_size; i++) { attr_mapping[i] = sec.attr_mapping[i]; virt_col[i] = sec.virt_col[i]; input_mode[i] = sec.input_mode[i]; is_lookup[i] = sec.is_lookup[i]; dist_vals[i] = sec.dist_vals[i]; } no_grouping_attr = sec.no_grouping_attr; no_aggregated_attr = sec.no_aggregated_attr; no_more_groups = sec.no_more_groups; no_groups = sec.no_groups; no_attr = sec.no_attr; pack_not_omitted = new bool[no_attr]; packrows_omitted = 0; packrows_part_omitted = 0; for (int i = 0; i < no_attr; i++) pack_not_omitted[i] = sec.pack_not_omitted[i]; tuple_left = NULL; if (sec.tuple_left) tuple_left = new Filter(*sec.tuple_left); // a copy of filter // init distinct_watch to make copy ctor has all Initialization logic distinct_watch.Initialize(no_attr); for (int gr_a = 0; gr_a < no_attr; gr_a++) { if (gt.AttrDistinct(gr_a)) { distinct_watch.DeclareAsDistinct(gr_a); } } }
if (sec.tuple_left) tuple_left = new Filter(*sec.tuple_left); // a copy of filter
void GroupByWrapper::InitTupleLeft(int64_t n) { DEBUG_ASSERT(tuple_left == NULL); tuple_left = new Filter(n, p_power); tuple_left->Set(); }
DEBUG_ASSERT(tuple_left == NULL);
- GroupByWrapper的拷贝构造函数中,tuple_left存在被赋值的情况
- 创建 GroupByWrapper后立即调用 InitTupleLeft, 此函数中如果 tuple_left 不为NULL则宕掉
四. 函数内逻辑冗余,导致被不必要的执行多次
- 进入else分支后,将被执行两次IsType_JoinSimple
五. 在没有理解原有代码的设计目的前,瞎改造,将只有串行的函数强行用多线程串行
void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, _int64& limit, _int64& offset, map<int,vector<PackOrderer::OrderingInfo> > &oi, ResultSender* sender, bool limit_less_than_no_groups) { MEASURE_FET("TempTable::MultiDimensionalGroupByScan(...)"); bool first_pass = true; _int64 cur_tuple = 0; // tuples are numbered according to tuple_left filter (not used, if tuple_left is null) _int64 displayed_no_groups = 0; // Determine dimensions to be iterated bool no_dims_found = true; DimensionVector dims(mind->NoDimensions()); gbw.FillDimsUsed(dims); for(int i = 0; i < mind->NoDimensions(); i++) if(dims[i]) { no_dims_found = false; break; } if(no_dims_found) dims[0] = true; // at least one dimension is needed vector<PackOrderer> po(mind->NoDimensions()); // do not use pack orderer if there are too many expected groups // (more than 50% of tuples) if(gbw.UpperApproxOfGroups() < mind->NoTuples() / 2) { map<int,vector<PackOrderer::OrderingInfo> >::iterator oi_it; bool one_group = (gbw.UpperApproxOfGroups() == 1); for(oi_it = oi.begin(); oi_it!= oi.end(); oi_it++) PackOrderer::ChoosePackOrderer(po[(*oi_it).first],(*oi_it).second, one_group); } MIIterator mit(mind, dims, po); factor = mit.Factor(); if(mit.NoTuples() == NULL_VALUE_64 || mit.NoTuples() > MAX_ROW_NUMBER) { // 2^47, a limit for filter below throw OutOfMemoryRCException("Aggregation is too large."); } gbw.SetDistinctTuples(mit.NoTuples()); #ifndef __BH_COMMUNITY__ AggregationWorkerEnt ag_worker(gbw, this); if(gbw.MayBeParallel() && ag_worker.MayBeParallel(mit) && !limit_less_than_no_groups) // if we are going to skip groups, we cannot do it in parallel ag_worker.CheckThreads(mit); // CheckThreads() must be executed if we want to be parallel #else AggregationWorker ag_worker(gbw, this); #endif if(!gbw.IsOnePass()) gbw.InitTupleLeft(mit.NoTuples()); bool rewind_needed = false; bool was_prefetched = false; try { do { if(rccontrol.isOn()) { if(gbw.UpperApproxOfGroups() == 1 || first_pass) rccontrol.lock(m_conn->GetThreadID()) << "Aggregating: " << mit.NoTuples() << " tuples left." << unlock; else rccontrol.lock(m_conn->GetThreadID()) << "Aggregating: " << gbw.TuplesNoOnes() << " tuples left, " << displayed_no_groups << " gr. found so far" << unlock; } cur_tuple = 0; gbw.ClearNoGroups(); // count groups locally created in this pass gbw.ClearDistinctBuffers(); // reset buffers for a new contents gbw.AddAllGroupingConstants(mit); ag_worker.Init(mit); if(rewind_needed) mit.Rewind(); // aggregated rows will be massively omitted packrow by packrow rewind_needed = true; was_prefetched = false; for(uint i = 0; i < t->NoAttrs(); i++) { // left as uninitialized (NULL or 0) if(t->GetAttrP(i)->mode == DELAYED) { MIDummyIterator m(1); t->GetAttrP(i)->term.vc->LockSourcePacks(m); } } while(mit.IsValid()) { / First stage - some distincts may be delayed if(m_conn->killed()) throw KilledRCException(); /// Grouping on a packrow _int64 packrow_length = mit.GetPackSizeLeft(); if(ag_worker.ThreadsUsed() == 1) { if(was_prefetched == false) { for(int i = 0; i < gbw.NoAttr(); i++) if(gbw.GetColumn(i)) gbw.GetColumn(i)->InitPrefetching(mit); was_prefetched = true; } int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if(grouping_result == 2) throw KilledRCException(); if(grouping_result != 5) packrows_found++; // for statistics if(grouping_result == 1) break; // end of the aggregation if(!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) { gbw.SetAsFull(); } } else { if(was_prefetched) { for(int i = 0; i < gbw.NoAttr(); i++) if(gbw.GetColumn(i)) gbw.GetColumn(i)->StopPrefetching(); was_prefetched = false; } MIInpackIterator lmit(mit); int grouping_result = ag_worker.AggregatePackrow(lmit, cur_tuple); if(grouping_result != 5) packrows_found++; // for statistics if(grouping_result == 1) break; if(grouping_result == 2) throw KilledRCException(); if(grouping_result == 3 || grouping_result == 4) throw NotImplementedRCException("Aggregation overflow."); if(mit.BarrierAfterPackrow()) { ag_worker.Barrier(); } ag_worker.ReevaluateNumberOfThreads(mit); mit.NextPackrow(); } cur_tuple += packrow_length; } MultiDimensionalDistinctScan(gbw, mit); // if not needed, no effect ag_worker.Commit(); // Now it is time to prepare output values if(first_pass) { first_pass = false; _int64 upper_groups = gbw.NoGroups() + gbw.TuplesNoOnes(); // upper approximation: the current size + all other possible rows (if any) t->CalculatePageSize(upper_groups); if(upper_groups > gbw.UpperApproxOfGroups()) upper_groups = gbw.UpperApproxOfGroups(); // another upper limitation: not more than theoretical number of combinations MIDummyIterator m(1); for(uint i = 0; i < t->NoAttrs(); i++) { t->GetAttrP(i)->CreateBuffer(upper_groups); // note: may be more than needed if(t->GetAttrP(i)->mode == DELAYED) t->GetAttrP(i)->term.vc->LockSourcePacks(m); } } rccontrol.lock(m_conn->GetThreadID()) << "Generating output." << unlock; gbw.RewindRows(); while(gbw.RowValid()) { // copy GroupTable into TempTable, row by row if(t->NoObj() >= limit) break; AggregateFillOutput(gbw, gbw.GetCurrentRow(), offset); // offset is decremented for each row, if positive if(sender && t->NoObj() > 65535) { TempTable::RecordIterator iter = t->begin(); for(_int64 i = 0; i < t->NoObj(); i++) { sender->Send(iter); ++iter; } displayed_no_groups += t->NoObj(); limit -= t->NoObj(); t->SetNoObj(0); } gbw.NextRow(); } if(sender) { TempTable::RecordIterator iter = t->begin(); for(_int64 i = 0; i < t->NoObj(); i++) { sender->Send(iter); ++iter; } displayed_no_groups += t->NoObj(); limit -= t->NoObj(); t->SetNoObj(0); } else displayed_no_groups = t->NoObj(); if(t->NoObj() >= limit) break; if(gbw.AnyTuplesLeft()) gbw.ClearUsed(); // prepare for the next pass, if needed } while(gbw.AnyTuplesLeft()); // do the next pass, if anything left } catch(...) { ag_worker.Commit(false); throw; } if(rccontrol.isOn()) rccontrol.lock(m_conn->GetThreadID()) << "Aggregated (" << displayed_no_groups << " gr). Omitted packrows: " << gbw.packrows_omitted << " + " << gbw.packrows_part_omitted << " partially, out of " << packrows_found << " total." << unlock; }
while(mit.IsValid()) { / First stage - some distincts may be delayed if(m_conn->killed()) throw KilledRCException(); /// Grouping on a packrow _int64 packrow_length = mit.GetPackSizeLeft(); if(ag_worker.ThreadsUsed() == 1) { if(was_prefetched == false) { for(int i = 0; i < gbw.NoAttr(); i++) if(gbw.GetColumn(i)) gbw.GetColumn(i)->InitPrefetching(mit); was_prefetched = true; } int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if(grouping_result == 2) throw KilledRCException(); if(grouping_result != 5) packrows_found++; // for statistics if(grouping_result == 1) break; // end of the aggregation if(!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) { gbw.SetAsFull(); } } else { if(was_prefetched) { for(int i = 0; i < gbw.NoAttr(); i++) if(gbw.GetColumn(i)) gbw.GetColumn(i)->StopPrefetching(); was_prefetched = false; } MIInpackIterator lmit(mit); int grouping_result = ag_worker.AggregatePackrow(lmit, cur_tuple); if(grouping_result != 5) packrows_found++; // for statistics if(grouping_result == 1) break; if(grouping_result == 2) throw KilledRCException(); if(grouping_result == 3 || grouping_result == 4) throw NotImplementedRCException("Aggregation overflow."); if(mit.BarrierAfterPackrow()) { ag_worker.Barrier(); } ag_worker.ReevaluateNumberOfThreads(mit); mit.NextPackrow(); } cur_tuple += packrow_length; }
void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int64_t &limit, int64_t &offset, ResultSender *sender, bool limit_less_than_no_groups) { MEASURE_FET("TempTable::MultiDimensionalGroupByScan(...)"); bool first_pass = true; // tuples are numbered according to tuple_left filter (not used, if tuple_left // is null) int64_t cur_tuple = 0; int64_t displayed_no_groups = 0; // Determine dimensions to be iterated bool no_dims_found = true; DimensionVector dims(mind->NumOfDimensions()); gbw.FillDimsUsed(dims); for (int i = 0; i < mind->NumOfDimensions(); i++) if (dims[i]) { no_dims_found = false; break; } if (no_dims_found) dims[0] = true; // at least one dimension is needed std::vector<PackOrderer> po(mind->NumOfDimensions()); MIIterator mit(mind, dims, po); factor = mit.Factor(); if (mit.NumOfTuples() == common::NULL_VALUE_64 || mit.NumOfTuples() > common::MAX_ROW_NUMBER) { // 2^47, a limit for filter below throw common::OutOfMemoryException("Aggregation is too large."); } gbw.SetDistinctTuples(mit.NumOfTuples()); int thd_cnt = 1; if (ParallelAllowed(gbw) && !limit_less_than_no_groups) { thd_cnt = std::thread::hardware_concurrency() / 4; // For concurrence reason, don't swallow all cores once. } AggregationWorkerEnt ag_worker(gbw, mind, thd_cnt, this); if (!gbw.IsOnePass()) gbw.InitTupleLeft(mit.NumOfTuples()); bool rewind_needed = false; try { do { if (rccontrol.isOn()) { if (gbw.UpperApproxOfGroups() == 1 || first_pass) rccontrol.lock(m_conn->GetThreadID()) << "Aggregating: " << mit.NumOfTuples() << " tuples left." << system::unlock; else rccontrol.lock(m_conn->GetThreadID()) << "Aggregating: " << gbw.TuplesNoOnes() << " tuples left, " << displayed_no_groups << " gr. found so far" << system::unlock; } cur_tuple = 0; gbw.ClearNoGroups(); // count groups locally created in this pass gbw.ClearDistinctBuffers(); // reset buffers for a new contents gbw.AddAllGroupingConstants(mit); ag_worker.Init(mit); if (rewind_needed) mit.Rewind(); // aggregated rows will be massively omitted packrow by // packrow rewind_needed = true; for (uint i = 0; i < t->NumOfAttrs(); i++) { // left as uninitialized (NULL or 0) if (t->GetAttrP(i)->mode == common::ColOperation::DELAYED) { MIDummyIterator m(1); t->GetAttrP(i)->term.vc->LockSourcePacks(m); } } if (ag_worker.ThreadsUsed() > 1) { ag_worker.DistributeAggreTaskAverage(mit); } else { while (mit.IsValid()) { // need muti thread // First stage - // some distincts may be delayed if (m_conn->Killed()) throw common::KilledException(); // Grouping on a packrow int64_t packrow_length = mit.GetPackSizeLeft(); int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if (sender) { sender->SetAffectRows(gbw.NumOfGroups()); } if (grouping_result == 2) throw common::KilledException(); if (grouping_result != 5) packrows_found++; // for statistics if (grouping_result == 1) break; // end of the aggregation if (!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) { gbw.SetAsFull(); } cur_tuple += packrow_length; } } gbw.ClearDistinctBuffers(); // reset buffers for a new contents MultiDimensionalDistinctScan(gbw, mit); // if not needed, no effect ag_worker.Commit(); // Now it is time to prepare output values if (first_pass) { first_pass = false; int64_t upper_groups = gbw.NumOfGroups() + gbw.TuplesNoOnes(); // upper approximation: the current size + // all other possible rows (if any) t->CalculatePageSize(upper_groups); if (upper_groups > gbw.UpperApproxOfGroups()) upper_groups = gbw.UpperApproxOfGroups(); // another upper limitation: not more // than theoretical number of // combinations MIDummyIterator m(1); for (uint i = 0; i < t->NumOfAttrs(); i++) { if (t->GetAttrP(i)->mode == common::ColOperation::GROUP_CONCAT) { t->GetAttrP(i)->SetTypeName(common::CT::VARCHAR); t->GetAttrP(i)->OverrideStringSize(tianmu_group_concat_max_len); } t->GetAttrP(i)->CreateBuffer(upper_groups); // note: may be more than needed if (t->GetAttrP(i)->mode == common::ColOperation::DELAYED) t->GetAttrP(i)->term.vc->LockSourcePacks(m); } } rccontrol.lock(m_conn->GetThreadID()) << "Group/Aggregate end. Begin generating output." << system::unlock; rccontrol.lock(m_conn->GetThreadID()) << "Output rows: " << gbw.NumOfGroups() + gbw.TuplesNoOnes() << ", output table row limit: " << t->GetPageSize() << system::unlock; int64_t output_size = (gbw.NumOfGroups() + gbw.TuplesNoOnes()) * t->GetOneOutputRecordSize(); gbw.RewindRows(); if (t->GetPageSize() >= (gbw.NumOfGroups() + gbw.TuplesNoOnes()) && output_size > (1L << 29) && !t->HasHavingConditions() && tianmu_sysvar_parallel_filloutput) { // Turn on parallel output when: // 1. output page is large enough to hold all output rows // 2. output result is larger than 512MB // 3. no have condition rccontrol.lock(m_conn->GetThreadID()) << "Start parallel output" << system::unlock; ParallelFillOutputWrapper(gbw, offset, limit, mit); } else { while (gbw.RowValid()) { // copy GroupTable into TempTable, row by row if (t->NumOfObj() >= limit) break; AggregateFillOutput(gbw, gbw.GetCurrentRow(), offset); // offset is decremented for each row, if positive if (sender && t->NumOfObj() > (1 << mind->ValueOfPower()) - 1) { TempTable::RecordIterator iter = t->begin(); for (int64_t i = 0; i < t->NumOfObj(); i++) { sender->Send(iter); ++iter; } displayed_no_groups += t->NumOfObj(); limit -= t->NumOfObj(); t->SetNumOfObj(0); } gbw.NextRow(); } } if (sender) { TempTable::RecordIterator iter = t->begin(); for (int64_t i = 0; i < t->NumOfObj(); i++) { sender->Send(iter); ++iter; } displayed_no_groups += t->NumOfObj(); limit -= t->NumOfObj(); t->SetNumOfObj(0); } else displayed_no_groups = t->NumOfObj(); if (t->NumOfObj() >= limit) break; if (gbw.AnyTuplesLeft()) gbw.ClearUsed(); // prepare for the next pass, if needed } while (gbw.AnyTuplesLeft()); // do the next pass, if anything left } catch (...) { ag_worker.Commit(false); throw; } if (rccontrol.isOn()) rccontrol.lock(m_conn->GetThreadID()) << "Generating output end. " << "Aggregated (" << displayed_no_groups << " group). Omitted packrows: " << gbw.packrows_omitted << " + " << gbw.packrows_part_omitted << " partially, out of " << packrows_found << " total." << system::unlock; }
if (ag_worker.ThreadsUsed() > 1) { ag_worker.DistributeAggreTaskAverage(mit); } else { while (mit.IsValid()) { // need muti thread // First stage - // some distincts may be delayed if (m_conn->Killed()) throw common::KilledException(); // Grouping on a packrow int64_t packrow_length = mit.GetPackSizeLeft(); int grouping_result = AggregatePackrow(gbw, &mit, cur_tuple); if (sender) { sender->SetAffectRows(gbw.NumOfGroups()); } if (grouping_result == 2) throw common::KilledException(); if (grouping_result != 5) packrows_found++; // for statistics if (grouping_result == 1) break; // end of the aggregation if (!gbw.IsFull() && gbw.MemoryBlocksLeft() == 0) { gbw.SetAsFull(); } cur_tuple += packrow_length; } }
- AggregationAlgorithm::AggregatePackrow在遍历迭代器过程中给GroupByWrapper的filter的blocks赋值
- 不考虑此前代码的实现,强行用多线程来跑只有单线程遍历的逻辑,不顾底层block数据实现的逻辑
- 没看明白原有代码的业务逻辑的因果关系前,不要随便按照臆想瞎几把搞
- 写不出来就空着说写不出来,也别为了糊弄写一坨南辕北辙的垃圾上去