sql
insert overwrite table dwintdata.dw_f_da_enterprise2
select *
from dwintdata.dw_f_da_enterprise;
hdfs文件大小数量展示
注意这里文件数有17个 共计321M 最后是划分为了21个task
为什么会有21个task?不是128M 64M 或者说我这里小于128 每个文件一个map吗?
tez ui日志
再仔细看每个task的日志
map0 data_source=CSIG/HIVE_UNION_SUBDIR_1/000008_0:0+16903572
map1 data_source=CSIG/HIVE_UNION_SUBDIR_1/000000_0:0+16960450
map2 data_source=CSIG/HIVE_UNION_SUBDIR_1/000003_0:0+16808165
map3 data_source=CSIG/HIVE_UNION_SUBDIR_1/000001_0:0+17007259
map4 data_source=CSIG/HIVE_UNION_SUBDIR_1/000006_0:0+16877230
map5 data_source=CSIG/HIVE_UNION_SUBDIR_1/000004_0:0+16941186
map6 data_source=hehe/HIVE_UNION_SUBDIR_2/000004_0:0+16777216
map7 data_source=hehe/HIVE_UNION_SUBDIR_2/000002_0:0+16777216
map8 data_source=CSIG/HIVE_UNION_SUBDIR_1/000002_0:0+16946639
map9 data_source=CSIG/HIVE_UNION_SUBDIR_1/000009_0:0+16855768
map10 data_source=hehe/HIVE_UNION_SUBDIR_2/000001_0:0+16777216
map11 data_source=CSIG/HIVE_UNION_SUBDIR_1/000005_0:0+16872517
map12 data_source=hehe/HIVE_UNION_SUBDIR_2/000000_0:0+16777216
map13 data_source=hehe/HIVE_UNION_SUBDIR_2/000006_0:0+16777216
map14 data_source=hehe/HIVE_UNION_SUBDIR_2/000000_0:16777216+729642 注意这里啊
data_source=hehe/HIVE_UNION_SUBDIR_2/000001_0:16777216+7188613
map15 data_source=CSIG/HIVE_UNION_SUBDIR_1/000007_0:0+16761291
map16 data_source=hehe/HIVE_UNION_SUBDIR_2/000005_0:0+16777216
map17 data_source=hehe/HIVE_UNION_SUBDIR_2/000003_0:0+16777216
map18 data_source=hehe/HIVE_UNION_SUBDIR_2/000002_0:16777216+7404916
data_source=hehe/HIVE_UNION_SUBDIR_2/000005_0:16777216+7341669
map19 data_source=hehe/HIVE_UNION_SUBDIR_2/000003_0:16777216+7378488
data_source=hehe/HIVE_UNION_SUBDIR_2/000006_0:16777216+7268763
map20 data_source=hehe/HIVE_UNION_SUBDIR_2/000004_0:16777216+7070700
data_source=hehe/000001_0:0+12488
16777216 这是一个什么数?大家要敏感下 1024*1024*16=16777216=16M 说明map只能读16M?
我18个文件中其中总量是321M 因为有一些不满16M 所以最后分为21个map 。那为什么是16M的map呢?
tez.grouping.min-size=16777216
tez.grouping.max-size=134217728 --128M
tez.grouping.split-waves=1.7
借用以前看到过的一篇文章
Hive 基于Tez引擎 map和reduce数的参数控制原理与调优经验_tez.grouping.max-size_abcdggggggg的博客-CSDN博客
这里好像说的通,但是又好像说不通 最低16M最高128M 那如果我是64M的文件呢?
分成1个map 还是64/16=4个map 还是64+64 2个map
mapper数的测试
测试参数set tez.grouping.min-size
测试1
set tez.grouping.min-size=16777216;
map数21个,reduce数2个,文件数21个,花费时间6s
测试2
set tez.grouping.min-size=67108864; --64M
map数8个,reduce数2个,生成文件数12个 花费时间8s
测试3
set tez.grouping.min-size=134217728 ;
map数5个,reduce个数2个,生成文件数8个,花费时间11s
结论分析
说明map读取的越大,时间越快(不一定啊。你要是map设置1k。。) 生成的文件越少
源码解析
看到上面那位博主写到一个参数tez.grouping.split-count,始终找不到只能找源码了。我发现
老哥也没好好搞。
https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java#L187
set tez.grouping.by-length=true 默认是true
set tez.grouping.by-count=false 默认是false
set tez.grouping.max-size=1024*1024*1024L --这是java定义的 你自己算了写好。
set tez.grouping.min-size=50*1024*1024
//originalSplits 数据文件分成了多少个切片
//之前算出来的切片数
public List<GroupedSplitContainer> getGroupedSplits(Configuration conf,
List<SplitContainer> originalSplits,
int desiredNumSplits,
String wrappedInputFormatName,
SplitSizeEstimatorWrapper estimator,
SplitLocationProviderWrapper locationProvider) throws
IOException, InterruptedException {
LOG.info("Grouping splits in Tez");
Objects.requireNonNull(originalSplits, "Splits must be specified");
//这里获取设置的参数tez.grouping.by-count
int configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);
if (configNumSplits > 0) {
// always use config override if specified
//desiredNumSplits 是tez算大概要多少 我们设置了始终以我们的为准
desiredNumSplits = configNumSplits;
LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);
}
if (estimator == null) {
estimator = DEFAULT_SPLIT_ESTIMATOR;
}
if (locationProvider == null) {
locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;
}
List<GroupedSplitContainer> groupedSplits = null;
String emptyLocation = "EmptyLocation";
String localhost = "localhost";
String[] emptyLocations = {emptyLocation};
groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);
//看所有文件是不是都是本地,个人猜测是数据文件都是有3个节点么,这个任务比如说运行再node11,数据有可能再node12 和node11 node13
boolean allSplitsHaveLocalhost = true;
long totalLength = 0;
Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);
// go through splits and add them to locations
for (SplitContainer split : originalSplits) {
totalLength += estimator.getEstimatedSize(split);
String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
allSplitsHaveLocalhost = false;
}
//判断是不是本地。
for (String location : locations ) {
if (location == null) {
location = emptyLocation;
allSplitsHaveLocalhost = false;
}
if (!location.equalsIgnoreCase(localhost)) {
allSplitsHaveLocalhost = false;
}
distinctLocations.put(location, null);
}
}
//如果我们配置了group_count 并且文件切片数量>0
//或者我们没有配置group_count 并且文件数==0 就走if 肯定是上面的情况
if (! (configNumSplits > 0 ||
originalSplits.size() == 0)) {
// numSplits has not been overridden by config
// numSplits has been set at runtime
// there are splits generated
// desired splits is less than number of splits generated
// Do sanity checks
//desiredNumSplits已经等于我们配置的数量了,
int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();
//获取文件总大小 320M 337336647/ 3 =112,445,549
long lengthPerGroup = totalLength/splitCount;
//获取我们配置的group最大size
long maxLengthPerGroup = conf.getLong(
TEZ_GROUPING_SPLIT_MAX_SIZE,
TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);
//获取我们配置的group最小size
long minLengthPerGroup = conf.getLong(
TEZ_GROUPING_SPLIT_MIN_SIZE,
TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);
if (maxLengthPerGroup < minLengthPerGroup ||
minLengthPerGroup <=0) {
throw new TezUncheckedException(
"Invalid max/min group lengths. Required min>0, max>=min. " +
" max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);
}
//如果我们配置的group count 不合理? 比如100G的文件 你配置了1个count 此时1个group100G 属于 >128M或者这里1G
if (lengthPerGroup > maxLengthPerGroup) {
//切片太大了。
// splits too big to work. Need to override with max size.
//就按照总大小/max +1来 因为没除尽所以+1 也就是按最大的来
int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;
LOG.info("Desired splits: " + desiredNumSplits + " too small. " +
" Desired splitLength: " + lengthPerGroup +
" Max splitLength: " + maxLengthPerGroup +
" New desired splits: " + newDesiredNumSplits +
" Total length: " + totalLength +
" Original splits: " + originalSplits.size());
desiredNumSplits = newDesiredNumSplits;
} else if (lengthPerGroup < minLengthPerGroup) {
// splits too small to work. Need to override with size.
int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;
/**
* This is a workaround for systems like S3 that pass the same
* fake hostname for all splits.
*/
if (!allSplitsHaveLocalhost) {
desiredNumSplits = newDesiredNumSplits;
}
LOG.info("Desired splits: " + desiredNumSplits + " too large. " +
" Desired splitLength: " + lengthPerGroup +
" Min splitLength: " + minLengthPerGroup +
" New desired splits: " + newDesiredNumSplits +
" Final desired splits: " + desiredNumSplits +
" All splits have localhost: " + allSplitsHaveLocalhost +
" Total length: " + totalLength +
" Original splits: " + originalSplits.size());
}
}
if (desiredNumSplits == 0 ||
originalSplits.size() == 0 ||
desiredNumSplits >= originalSplits.size()) {
// nothing set. so return all the splits as is
LOG.info("Using original number of splits: " + originalSplits.size() +
" desired splits: " + desiredNumSplits);
groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());
for (SplitContainer split : originalSplits) {
GroupedSplitContainer newSplit =
new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)),
null);
newSplit.addSplit(split);
groupedSplits.add(newSplit);
}
return groupedSplits;
}
//总大小处于切片数 by-length
long lengthPerGroup = totalLength/desiredNumSplits;
//数据所在的节点数
int numNodeLocations = distinctLocations.size();
//每个节点含有的切片数 by-node
int numSplitsPerLocation = originalSplits.size()/numNodeLocations;
//每个group含有的切片数
int numSplitsInGroup = originalSplits.size()/desiredNumSplits;
// allocation loop here so that we have a good initial size for the lists
for (String location : distinctLocations.keySet()) {
distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));
}
Set<String> locSet = new HashSet<String>();
//对所有切片开始遍历
for (SplitContainer split : originalSplits) {
locSet.clear();
String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
for (String location : locations) {
if (location == null) {
location = emptyLocation;
}
locSet.add(location);
}
for (String location : locSet) {
LocationHolder holder = distinctLocations.get(location);
holder.splits.add(split);
}
}
//按大小划分group 默认true
boolean groupByLength = conf.getBoolean(
TEZ_GROUPING_SPLIT_BY_LENGTH,
TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
//按指定count划分group
boolean groupByCount = conf.getBoolean(
TEZ_GROUPING_SPLIT_BY_COUNT,
TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
//按照节点划分group
boolean nodeLocalOnly = conf.getBoolean(
TEZ_GROUPING_NODE_LOCAL_ONLY,
TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT);
if (!(groupByLength || groupByCount)) {
throw new TezUncheckedException(
"None of the grouping parameters are true: "
+ TEZ_GROUPING_SPLIT_BY_LENGTH + ", "
+ TEZ_GROUPING_SPLIT_BY_COUNT);
}
//打印日志信息分析
LOG.info("Desired numSplits: " + desiredNumSplits +
" lengthPerGroup: " + lengthPerGroup +
" numLocations: " + numNodeLocations +
" numSplitsPerLocation: " + numSplitsPerLocation +
" numSplitsInGroup: " + numSplitsInGroup +
" totalLength: " + totalLength +
" numOriginalSplits: " + originalSplits.size() +
" . Grouping by length: " + groupByLength +
" count: " + groupByCount +
" nodeLocalOnly: " + nodeLocalOnly);
// go through locations and group splits
//处理到第几个切片了
int splitsProcessed = 0;
List<SplitContainer> group = new ArrayList<SplitContainer>(numSplitsInGroup);
Set<String> groupLocationSet = new HashSet<String>(10);
boolean allowSmallGroups = false;
boolean doingRackLocal = false;
int iterations = 0;
//对每一个切片开始遍历
while (splitsProcessed < originalSplits.size()) {
iterations++;
int numFullGroupsCreated = 0;
for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
group.clear();
groupLocationSet.clear();
String location = entry.getKey();
LocationHolder holder = entry.getValue();
SplitContainer splitContainer = holder.getUnprocessedHeadSplit();
if (splitContainer == null) {
// all splits on node processed
continue;
}
int oldHeadIndex = holder.headIndex;
long groupLength = 0;
int groupNumSplits = 0;
do {
//这个group
group.add(splitContainer);
groupLength += estimator.getEstimatedSize(splitContainer);
groupNumSplits++;
holder.incrementHeadIndex();
splitContainer = holder.getUnprocessedHeadSplit();
} while(splitContainer != null
&& (!groupByLength ||
(groupLength + estimator.getEstimatedSize(splitContainer) <= lengthPerGroup))
&& (!groupByCount ||
(groupNumSplits + 1 <= numSplitsInGroup)));
if (holder.isEmpty()
&& !allowSmallGroups
&& (!groupByLength || groupLength < lengthPerGroup/2)
&& (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
// group too small, reset it
holder.headIndex = oldHeadIndex;
continue;
}
numFullGroupsCreated++;
// One split group created
String[] groupLocation = {location};
if (location == emptyLocation) {
groupLocation = null;
} else if (doingRackLocal) {
for (SplitContainer splitH : group) {
String[] locations = locationProvider.getPreferredLocations(splitH);
if (locations != null) {
for (String loc : locations) {
if (loc != null) {
groupLocationSet.add(loc);
}
}
}
}
groupLocation = groupLocationSet.toArray(groupLocation);
}
GroupedSplitContainer groupedSplit =
new GroupedSplitContainer(group.size(), wrappedInputFormatName,
groupLocation,
// pass rack local hint directly to AM
((doingRackLocal && location != emptyLocation)?location:null));
for (SplitContainer groupedSplitContainer : group) {
groupedSplit.addSplit(groupedSplitContainer);
Preconditions.checkState(groupedSplitContainer.isProcessed() == false,
"Duplicates in grouping at location: " + location);
groupedSplitContainer.setIsProcessed(true);
splitsProcessed++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Grouped " + group.size()
+ " length: " + groupedSplit.getLength()
+ " split at: " + location);
}
groupedSplits.add(groupedSplit);
}
if (!doingRackLocal && numFullGroupsCreated < 1) {
// no node could create a regular node-local group.
// Allow small groups if that is configured.
if (nodeLocalOnly && !allowSmallGroups) {
LOG.info(
"Allowing small groups early after attempting to create full groups at iteration: {}, groupsCreatedSoFar={}",
iterations, groupedSplits.size());
allowSmallGroups = true;
continue;
}
// else go rack-local
doingRackLocal = true;
// re-create locations
int numRemainingSplits = originalSplits.size() - splitsProcessed;
Set<SplitContainer> remainingSplits = new HashSet<SplitContainer>(numRemainingSplits);
// gather remaining splits.
for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
LocationHolder locHolder = entry.getValue();
while (!locHolder.isEmpty()) {
SplitContainer splitHolder = locHolder.getUnprocessedHeadSplit();
if (splitHolder != null) {
remainingSplits.add(splitHolder);
locHolder.incrementHeadIndex();
}
}
}
if (remainingSplits.size() != numRemainingSplits) {
throw new TezUncheckedException("Expected: " + numRemainingSplits
+ " got: " + remainingSplits.size());
}
// doing all this now instead of up front because the number of remaining
// splits is expected to be much smaller
RackResolver.init(conf);
Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());
Map<String, LocationHolder> rackLocations = createLocationsMap(conf);
for (String location : distinctLocations.keySet()) {
String rack = emptyLocation;
if (location != emptyLocation) {
rack = RackResolver.resolve(location).getNetworkLocation();
}
locToRackMap.put(location, rack);
if (rackLocations.get(rack) == null) {
// splits will probably be located in all racks
rackLocations.put(rack, new LocationHolder(numRemainingSplits));
}
}
distinctLocations.clear();
HashSet<String> rackSet = new HashSet<String>(rackLocations.size());
int numRackSplitsToGroup = remainingSplits.size();
for (SplitContainer split : originalSplits) {
if (numRackSplitsToGroup == 0) {
break;
}
// Iterate through the original splits in their order and consider them for grouping.
// This maintains the original ordering in the list and thus subsequent grouping will
// maintain that order
if (!remainingSplits.contains(split)) {
continue;
}
numRackSplitsToGroup--;
rackSet.clear();
String[] locations = locationProvider.getPreferredLocations(split);
if (locations == null || locations.length == 0) {
locations = emptyLocations;
}
for (String location : locations ) {
if (location == null) {
location = emptyLocation;
}
rackSet.add(locToRackMap.get(location));
}
for (String rack : rackSet) {
rackLocations.get(rack).splits.add(split);
}
}
remainingSplits.clear();
distinctLocations = rackLocations;
// adjust split length to be smaller because the data is non local
float rackSplitReduction = conf.getFloat(
TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,
TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);
if (rackSplitReduction > 0) {
long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);
int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);
if (newLengthPerGroup > 0) {
lengthPerGroup = newLengthPerGroup;
}
if (newNumSplitsInGroup > 0) {
numSplitsInGroup = newNumSplitsInGroup;
}
}
LOG.info("Doing rack local after iteration: " + iterations +
" splitsProcessed: " + splitsProcessed +
" numFullGroupsInRound: " + numFullGroupsCreated +
" totalGroups: " + groupedSplits.size() +
" lengthPerGroup: " + lengthPerGroup +
" numSplitsInGroup: " + numSplitsInGroup);
// dont do smallGroups for the first pass
continue;
}
if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {
// a few nodes have a lot of data or data is thinly spread across nodes
// so allow small groups now
allowSmallGroups = true;
LOG.info("Allowing small groups after iteration: " + iterations +
" splitsProcessed: " + splitsProcessed +
" numFullGroupsInRound: " + numFullGroupsCreated +
" totalGroups: " + groupedSplits.size());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration: " + iterations +
" splitsProcessed: " + splitsProcessed +
" numFullGroupsInRound: " + numFullGroupsCreated +
" totalGroups: " + groupedSplits.size());
}
}
LOG.info("Number of splits desired: " + desiredNumSplits +
" created: " + groupedSplits.size() +
" splitsProcessed: " + splitsProcessed);
return groupedSplits;
}
set tez.grouping.by-length=true 默认是true
set tez.grouping.by-count=false 默认是false
set tez.grouping.node.local.only 默认是false
set tez.grouping.max-size=1024*1024*1024L=1G --这是java定义的 你自己算了写好。
set tez.grouping.min-size=50*1024*1024=50M
set tez.grouping.split-count=0
以我的为例
max=128M min=16M count=10 by-count=true by-length=true split-count=10
文件总共是320M
前面逻辑不看。反正就是以min=16M读取 有21个切片分为了21个group
如果split-count=10 320M/10=32M 32M between min and max 最后数量就是10个
如果split-count=2 320M/2=160M 不在min和max之间 所以320m/max=3 3+1=4个
后面太长 懒得看了。
反正还要根据 tez.grouping.rack-split-reduction=0.75f 再去调整一波。。
总之这个参数有用by-count有点用
测试参数tez.grouping.split-count
测试1
set tez.grouping.by-count=true;
set tez.grouping.split-count=50;
26个containner 26core 104448MB
测试2
set tez.grouping.by-count=true;
set tez.grouping.split-count=15;
26个containner 26core 104448MB
测试3
set tez.grouping.by-count=true;
set tez.grouping.split-count=10;
16个container 16core 63488MB
测试4
set tez.grouping.by-count=true;
set tez.grouping.split-count=5;
9个container 9core 34816Mb
测试5
set tez.grouping.by-count=true;
set tez.grouping.split-count=2;
5个container 5core 18432Mb
说明啥 这个参数确实有用,但是不是特别好控制map数。(可能是我不太了解源码)
但是突然感觉不对
测试6
set tez.grouping.split-count=2;
set tez.grouping.by-count=true;
set tez.grouping.by-length=false;
说明这个bycount=true 和by-length=false时才会起作用
测试7
set tez.grouping.split-count=10;
set tez.grouping.by-count=true;
set tez.grouping.by-length=false;
测试fileinputformat.split.minsize
mapreduce.input.fileinputformat.split.maxsize=256000000
mapreduce.input.fileinputformat.split.minsize=1
测试1
set mapreduce.input.fileinputformat.split.minsize=128000000
这里是18个原因就是文件个数
测试2
set mapreduce.input.fileinputformat.split.minsize=64000000
这里也是文件个数
测试3
set mapreduce.input.fileinputformat.split.minsize=16000000;
这里23就是18+多出来的那部分
测试4
set mapreduce.input.fileinputformat.split.minsize=8000000;
这里25其实也差不多 为什么不是上面23*46呢? 因为tez.min.size=16M
set mapreduce.input.fileinputformat.split.minsize=8000000;
set tez.grouping.min-size=8388608; 8M
看 果然被我猜对了。!!!!!!!!!
至此mapper数的参数调整 好像也差不多
开始测试reduce的个数
参数 | 默认值 | 说明 |
mapred.reduce.tasks | -1 | 指定reduce的个数 |
hive.exec.reducers.bytes.per.reducer | 67108864 | 每个reduce的数据处理量 |
hive.exec.reducers.max | 1009 | reduce的最大个数 |
hive.tez.auto.reducer.parallelism | true | 是否启动reduce自动并行 |
有点累了。
测试mapred.reduce.tasks
set mapred.reduce.tasks=4
reduce数变多了。22个container 88064 MB
set mapred.reduce.tasks=10
22个container 88064Mb
set mapred.reduce.tasks=20
28个container112640MB
测试hive.exec.reducers.bytes.per.reducer=67108864
这个默认试64M 安导里我的reduce也差不多320M 也要分成5个reduce呀;
set hive.exec.reducers.bytes.per.reducer=33554432
set hive.exec.reducers.bytes.per.reducer=8388608
没啥用,我记得这个参数以前有用的。可能引擎不一样了吧
有点累了。后面在看怎么调整container的大小