尚學堂Java學習筆記:MapReduce 工作原理

編程語言 MapReduce Java HDFS 北京尚學堂Java學院 北京尚學堂Java學院 2017-08-27

我們通過 Client、JobTrask 和 TaskTracker 的角度來分析 MapReduce 的工作原理:

首先在客戶端(Client)啟動一個作業(Job),向 JobTracker 請求一個 Job ID。將運行作業所需要的資源文件複製到 HDFS 上,包括 MapReduce 程序打包的 JAR 文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在 JobTracker 專門為該作業創建的文件夾中,文件夾名為該作業的Job ID 。JAR文件默認會有10個副本(mapred.submit.replication 屬性控制);輸入劃分信息告訴了 JobTracker應該為這個作業啟動多少個 map 任務等信息。

JobTracker 接收到作業後,將其放在一個作業隊列裡,等待作業調度器對其進行調度 當作業調度器根據自己的調度算法調度到該作業時,會根據輸入劃分信息為每個劃分創建一個 map 任務,並將 map 任務分配給TaskTracker 執行。對於 map 和 reduce 任務,TaskTracker 根據主機核的數量和內存的大小有固定數量的 map 槽和reduce槽。這裡需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這裡就涉及到上面提到的數據本地化(Data-Local)。

尚學堂Java學習筆記:MapReduce 工作原理

TaskTracker 每隔一段時間會給JobTracker 發送一個心跳,告訴 JobTracker 它依然在 運行,同時心跳中還攜帶著很多的信息,比如當前map任務完成的進度等信息。當JobTracker 收到作業的最後一個任務完成信息時,便把該作業設置成“成功”。當JobClient 查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。

Map 端流程:

1)每個輸入分片會讓一個map任務來處理,map 輸出的結果會暫且放在一個環形內存緩衝區中(該緩衝區的大小默認為100M,由io.sort.mb 屬性控制),當該緩衝區快要溢出時(默認為緩衝區大小的 80%,由io.sort.spill.percent 屬性控制),會在本地文件系統中創建一個溢出文件,將該緩衝區中的數據寫入這個文件。

2)在寫入磁盤之前,線程首先根據reduce 任務的數目將數據劃分為相同數目的分區,也就是一個 reduce 任務對應一個分區的數據。這樣做是為了避免有些 reduce 任務分配到大量數據,而有些 reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行 hash 的過程。然後對每個分區中的數據進行排 序,如果此時設置了 Combiner,將排序後的結果進行 Combine 操作,這樣做的目的是讓儘可能少的數據寫入到磁盤。

3) 當 map 任務輸出最後一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合併。合併的過程中會不斷地進行排序和 Combine 操作,目的有兩個:

儘量減少每次寫入磁盤的數據量;

儘量減少下一複製階段網絡傳輸的數據量。最後合併成了一個已分區且已排序的文件。為了減少網絡傳輸的數據量,這裡可以將數據壓縮,只要將 mapred.compress.map.out 設置為true 就可以了。

4) 將分區中的數據拷貝給相對應的 reduce 任務。分區中的數據怎麼知道它對應的 reduce 是哪個呢?其實 map 任務一直和其父 TaskTracker 保持聯繫,而 TaskTracker 又一直和JobTracker 保持心跳。所以 JobTracker 中保存了整個集群中的宏觀信息。只要 reduce 任務向JobTracker 獲取對應的map 輸出位置就可以了。

尚學堂Java學習筆記:MapReduce 工作原理

Reduce 端流程:

1) Reduce 會接收到不同map任務傳來的數據,並且每個map傳來的數據都是有序的。如果 reduce 端接受的數據量相當小,則直接存儲在內存中(緩衝區大小由 mapred.job.shuffle.input.buffer.percent 屬性控制,表示用作此用途的堆空間的百分比),如果數據量超過了該緩衝區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對數據合併後溢寫到磁盤中。

2) 隨著溢寫文件的增多,後臺線程會將它們合併成一個更大的有序的文件,這樣做是 為了給後面的合併節省時間。其實不管在 map 端還是 reduce 端,MapReduce 都是反覆地執行排序,合併操作,所以排序是 hadoop 的靈魂。

3)合併的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據儘可能地少,並且最後一次合併的結果並沒有寫入磁盤,而是直接輸入到reduce 函數。

在Map處理數據後,到 Reduce得到數據之前,這個流程在 MapReduce中可以看做是一個 Shuffle 的過程。

在經過 mapper 的運行後,我們得知mapper的輸出是這樣一個 key/value 對。到底當前的 key 應該交由哪個 reduce 去做呢,是需要現在決定的。MapReduce 提供 Partitioner 接口,它的作用就是根據 key 或 value 及 reduce 的數量來決定當前的這對輸 出數據最終應該交由哪個 reduce task 處理。默認對 key做hash後再以 reduce task 數量取模。默認的取模方式只是為了平均 reduce 的處理能力,如果用戶自己對 Partitioner 有需求,可以訂製並設置到 job 上。

相關推薦

推薦中...