mapTask並行度的決定機制
一個job的map階段並行度由客戶端在提交job時決定,而客戶端對map階段並行度的規劃的基本邏輯為:將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然後每一個split分配一個mapTask並行實例處理。
FileInputFormat切片機制
微信:intsmaze(非誠勿擾)
1、默認切片定義在InputFormat類中的getSplit方法
2、FileInputFormat中默認的切片機制:
a) 簡單地按照文件的內容長度進行切片
b) 切片大小,默認等於hdfs的block大小
c) 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
file1.txt 260M file2.txt 10M
經過FileInputFormat的切片機制運算後,形成的切片信息如下:
file1.txt.split1-- 0~128 file1.txt.split2-- 128~260 //如果剩餘的文件長度/切片長度<=1.1則會將剩餘文件的長度並未一個切片 file2.txt.split1-- 0~10M
3、FileInputFormat中切片的大小的參數配置
通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定。
minsize:默認值:1 配置參數: mapreduce.input.fileinputformat.split.minsize maxsize:默認值:Long.MAXValue 配置參數:mapreduce.input.fileinputformat.split.maxsize blocksize:值為hdfs的對應文件的blocksize
配置讀取目錄下文件數量的線程數:public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads";
因此,默認情況下,Math.max(minSize, Math.min(maxSize, blockSize));切片大小=blocksize
maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小。
minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。
選擇併發數的影響因素:
1、運算節點的硬件配置
2、運算任務的類型:CPU密集型還是IO密集型
3、運算任務的數據量
3、hadoop2.6.4源碼解析
org.apache.hadoop.mapreduce.JobSubmitter類
//得到job的map任務的並行數量 private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration; int maps; if (jConf.getUseNewMapper) { maps = writeNewSplits(job, jobSubmitDir);} else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; } @SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration; InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass, conf); List<InputSplit> splits = input.getSplits(job);T array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
切片計算邏輯,關注紅色字體代碼即可。
public List<InputSplit> getSplits(JobContext job) throws IOException { Stopwatch sw = new Stopwatch.start;
long minSize = Math.max(getFormatMinSplitSize, getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>; List<FileStatus> files = listStatus(job);
//遍歷文件,對每一個文件進行如下處理:獲得文件的blocksize,獲取文件的長度,得到切片信息(spilt 文件路徑,切片編號,偏移量範圍) for (FileStatus file: files) { Path path = file.getPath; long length = file.getLen; if (length != 0) { BlockLocation blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations; } else { FileSystem fs = path.getFileSystem(job.getConfiguration); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) {long blockSize = file.getBlockSize; long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts, blkLocations[blkIndex].getCachedHosts)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts, blkLocations[blkIndex].getCachedHosts)); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts, blkLocations[0].getCachedHosts)); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration.setLong(NUM_INPUT_FILES, files.size); sw.stop; if (LOG.isDebugEnabled) { LOG.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis); } return splits; }
public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"; public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"; long minSize = Math.max(getFormatMinSplitSize, getMinSplitSize(job)); //保證切分的文件長度最小不得小於1字節 protected long getFormatMinSplitSize { return 1; } //如果沒有在conf中設置SPLIT_MINSIZE參數,則取默認值1字節。 public static long getMinSplitSize(JobContext job) { return job.getConfiguration.getLong(SPLIT_MINSIZE, 1L); } //得到切片文件的最大長度 long maxSize = getMaxSplitSize(job); //如果沒有在conf中設置SPLIT_MAXSIZE參數,則去默認值Long.MAX_VALUE字節。 public static long getMaxSplitSize(JobContext context) { return context.getConfiguration.getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); } //讀取指定目錄下的所有文件的信息 List<FileStatus> files = listStatus(job); //如果沒有指定開啟幾個線程讀取,則默認一個線程去讀文件信息,因為存在目錄下有上億個文件的情況,所以有需要開啟多個線程加快讀取。 int numThreads = job.getConfiguration.getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS); public static final String LIST_STATUS_NUM_THREADS = "mapreduce.input.fileinputformat.list-status.num-threads"; public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; //計算切片文件的邏輯大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } private static final double SPLIT_SLOP = 1.1; // 10% slop //判斷剩餘文件與切片大小的比是否為1.1. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts, blkLocations[blkIndex].getCachedHosts)); bytesRemaining -= splitSize; }
map並行度
如果job的每個map或者reduce的task的運行時間都只有30-40秒鐘(最好每個map的執行時間最少不低於一分鐘),那麼就減少該job的map或者reduce數。每一個task的啟動和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用可以改善該問題:
(mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多可以順序執行的task數目(屬於同一個Job)是1。也就是說一個task啟一個JVM)。
小文件的場景下,默認的切片機制會造成大量的maptask處理很少量的數據,效率低下:
解決方案:
推薦:把小文件存入hdfs之前進行預處理,先合併為大文件後再上傳。
折中:寫程序對hdfs上小文件進行合併再跑job處理。
補救措施:如果大量的小文件已經存在hdfs上了,使用combineInputFormate組件,它可以將眾多的小文件從邏輯上規劃到一個切片中,這樣多個小文件就可以交給一個maptask操作了。
最近實在是不知道學點什麼了呦,就把hadoop回顧一下,當初學時,為了快速上手,都是記各種理論以及結論,沒有時間去看源碼驗證,也不知道人家說的結論是否正確,這次回滾就是看源碼驗證當初結論的正確性。這也快一年沒有用了,最近一直從事分佈式實時計算的研究。