7個實例全面掌握Hadoop MapReduce

編程語言 MapReduce Hadoop HDFS 中國統計網 2017-06-09

7個實例全面掌握Hadoop MapReduce

作者介紹

杜亦舒,創業中,技術合夥人,喜歡研究分享技術。個人訂閱號:性能與架構。

本文旨在幫您快速瞭解 MapReduce 的工作機制和開發方法,解決以下幾個問題:

  • MapReduce 基本原理是什麼?

  • MapReduce 的執行過程是怎麼樣的?

  • MapReduce 的核心流程細節

  • 如何進行 MapReduce 程序開發?(通過7個實例逐漸掌握)

文章中提供了程序實例中涉及到的測試數據文件,可以直接下載使用。

關於實踐環境,如果您不喜歡自己搭建Hadoop環境,可以下載使用本教程提供的環境,實踐部分內容中會介紹具體使用方法。

通過學習並實踐完成後,可以對 MapReduce 工作原理有比較清晰的認識,並掌握 MapReduce 的編程思路。

大綱:

一、MapReduce 基本原理

二、MapReduce 入門示例 - WordCount 單詞統計

三、MapReduce 執行過程分析

  • 實例1 - 自定義對象序列化

  • 實例2 - 自定義分區

  • 實例3 - 計算出每組訂單中金額最大的記錄

  • 實例4 - 合併多個小文件

  • 實例5 - 分組輸出到多個文件

四、MapReduce 核心流程梳理

  • 實例6 - join 操作

  • 實例7 - 計算出用戶間的共同好友

五、下載方式

一、MapReduce基本原理

MapReduce是一種編程模型,用於大規模數據集的分佈式運算。

1、MapReduce通俗解釋

圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。

張同學統計 書架1

王同學統計 書架2

劉同學統計 書架3

……

過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。

這個過程就可以理解為MapReduce的工作過程。

2、MapReduce中有兩個核心操作

(1)map

管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。

(2)reduce

每個同學的結果進行彙總,這個過程是reduce。

3、MapReduce工作過程拆解

下面通過一個景點案例(單詞統計)看MapReduce是如何工作的。

有一個文本文件,被分成了4份,分別放到了4臺服務器中存儲

Text1:the weather is good

Text2:today is good

Text3:good weather is good

Text4:today has good weather

現在要統計出每個單詞的出現次數。

7個實例全面掌握Hadoop MapReduce

處理過程

(1)拆分單詞

  • map節點1

輸入:“the weather is good”

輸出:(the,1),(weather,1),(is,1),(good,1)

7個實例全面掌握Hadoop MapReduce

  • map節點2

輸入:“today is good”

輸出:(today,1),(is,1),(good,1)

7個實例全面掌握Hadoop MapReduce

  • map節點3

輸入:“good weather is good”

輸出:(good,1),(weather,1),(is,1),(good,1)

7個實例全面掌握Hadoop MapReduce

  • map節點4

輸入:“today has good weather”

輸出:(today,1),(has,1),(good,1),(weather,1)

7個實例全面掌握Hadoop MapReduce

(2)排序

  • map節點1

7個實例全面掌握Hadoop MapReduce

  • map節點2

7個實例全面掌握Hadoop MapReduce

  • map節點3

7個實例全面掌握Hadoop MapReduce

  • map節點4

7個實例全面掌握Hadoop MapReduce

(3)合併

  • map節點1

7個實例全面掌握Hadoop MapReduce

  • map節點2

7個實例全面掌握Hadoop MapReduce

  • map節點3

7個實例全面掌握Hadoop MapReduce

  • map節點4

7個實例全面掌握Hadoop MapReduce

(4)彙總統計

每個map節點都完成以後,就要進入reduce階段了。

例如使用了3個reduce節點,需要對上面4個map節點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節點。

Reduce節點進行統計,計算出最終結果。

7個實例全面掌握Hadoop MapReduce

這就是最基本的MapReduce處理流程。

4、MapReduce編程思路

瞭解了MapReduce的工作過程,我們思考一下用代碼實現時需要做哪些工作?

  1. 在4個服務器中啟動4個map任務

  2. 每個map任務讀取目標文件,每讀一行就拆分一下單詞,並記下來次單詞出現了一次

  3. 目標文件的每一行都處理完成後,需要把單詞進行排序

  4. 在3個服務器上啟動reduce任務

  5. 每個reduce獲取一部分map的處理結果

  6. reduce任務進行彙總統計,輸出最終的結果數據

但不用擔心,MapReduce是一個非常優秀的編程模型,已經把絕大多數的工作做完了,我們只需要關心2個部分:

  1. map處理邏輯——對傳進來的一行數據如何處理?輸出什麼信息?

  2. reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什麼信息?

編寫好這兩個核心業務邏輯之後,只需要幾行簡單的代碼把map和reduce裝配成一個job,然後提交給Hadoop集群就可以了。

至於其它的複雜細節,例如如何啟動map任務和reduce任務、如何讀取文件、如對map結果排序、如何把map結果數據分配給reduce、reduce如何把最終結果保存到文件等等,MapReduce框架都幫我們做好了,而且還支持很多自定義擴展配置,例如如何讀文件、如何組織map或者reduce的輸出結果等等,後面的示例中會有介紹。

二、MapReduce入門示例:WordCount單詞統計

WordCount是非常好的入門示例,相當於helloword,下面就開發一個WordCount的MapReduce程序,體驗實際開發方式。

1、安裝Hadoop實踐環境

您可以選擇自己搭建環境,也可以使用打包好的Hadoop環境(版本2.7.3)。

這個Hadoop環境實際上是一個虛機鏡像,所以需要安裝virtualbox虛擬機、vagrant鏡像管理工具,和我的Hadoop鏡像,然後用這個鏡像啟動虛機就可以了,下面是具體操作步驟:

(1)安裝virtualbox

(2)安裝vagrant

(3)下載Hadoop鏡像

(4)啟動

加載Hadoop鏡像

vagrant box add {自定義鏡像名稱} {鏡像所在路徑}

例如您想命名為Hadoop,鏡像下載後的路徑為d:\hadoop.box,加載命令就是這樣:

vagrant box add hadoop d:\hadoop.box

創建工作目錄,例如d:\hdfstest。

進入此目錄,初始化

cd d:\hdfstest

vagrant init hadoop

啟動虛機

vagrant up

啟動完成後,就可以使用SSH客戶端登錄虛機了

IP 127.0.0.1

端口 2222

用戶名 root

密碼 vagrant

在Hadoop服務器中啟動HDFS和Yarn,之後就可以運行MapReduce程序了

start-dfs.sh

start-yarn.sh

2、創建項目

注:流程是在本機開發,然後打包,上傳到Hadoop服務器上運行。

新建項目目錄wordcount,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在的目錄結構

7個實例全面掌握Hadoop MapReduce

3、代碼

mapper程序:src/main/java/WordcountMapper.java

內容:

7個實例全面掌握Hadoop MapReduce

這裡定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行數據,就會調用一次這個map方法。

map的處理流程就是接收一個key value對兒,然後進行業務邏輯處理,最後輸出一個key value對兒。

Mapper<LongWritable, Text, Text, IntWritable>

其中的4個類型分別是:輸入key類型、輸入value類型、輸出key類型、輸出value類型。

MapReduce框架讀到一行數據侯以key value形式傳進來,key默認情況下是mr礦機所讀到一行文本的起始偏移量(Long類型),value默認情況下是mr框架所讀到的一行的數據內容(String類型)。

輸出也是key value形式的,是用戶自定義邏輯處理完成後定義的key,用戶自己決定用什麼作為key,value是用戶自定義邏輯處理完成後的value,內容和類型也是用戶自己決定。

此例中,輸出key就是word(字符串類型),輸出value就是單詞數量(整型)。

這裡的數據類型和我們常用的不一樣,因為MapReduce程序的輸出數據需要在不同機器間傳輸,所以必須是可序列化的,例如Long類型,Hadoop中定義了自己的可序列化類型LongWritable,String對應的是Text,int對應的是IntWritable。

reduce程序:src/main/java/WordCountReducer.java

7個實例全面掌握Hadoop MapReduce

這裡定義了一個Reducer類和一個reduce方法。

當傳給reduce方法時,就變為:

Reducer<Text, IntWritable, Text, IntWritable>

4個類型分別指:輸入key的類型、輸入value的類型、輸出key的類型、輸出value的類型。

需要注意,reduce方法接收的是:一個字符串類型的key、一個可迭代的數據集。因為reduce任務讀取到map任務處理結果是這樣的:

(good,1)(good,1)(good,1)(good,1)

當傳給reduce方法時,就變為:

key:good

value:(1,1,1,1)

所以,reduce方法接收到的是同一個key的一組value。

主程序:src/main/java/WordCountMapReduce.java

7個實例全面掌握Hadoop MapReduce

這個main方法就是用來組裝一個job並提交執行

4、編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構:

7個實例全面掌握Hadoop MapReduce

5、運行

先把target中的jar上傳到Hadoop服務器,然後在Hadoop服務器的HDFS中準備測試文件(把Hadoop所在目錄下的txt文件都上傳到HDFS)

cd $HADOOP_HOME

hdfs dfs -mkdir -p /wordcount/input

hdfs dfs -put *.txt /wordcount/input

執行wordcount jar

hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR

educe /wordcount/input /wordcount/output

執行完成後驗證

hdfs dfs -cat /wordcount/output/*

可以看到單詞數量統計結果。

三、MapReduce執行過程分析

下面看一下從job提交到執行完成這個過程是怎樣。

(1)客戶端提交任務

Client提交任務時會先到HDFS中查看目標文件的大小,瞭解要獲取的數據的規模,然後形成任務分配的規劃,例如:

a.txt 0-128M交給一個task,128-256M 交給一個task,b.txt 0-128M交給一個task,128-256M交給一個task ...,形成規劃文件job.split。

然後把規劃文件job.split、jar、配置文件xml提交給yarn(Hadoop集群資源管理器,負責為任務分配合適的服務器資源)

7個實例全面掌握Hadoop MapReduce

(2)啟動appmaster

注:appmaster是本次job的主管,負責maptask和reducetask的啟動、監控、協調管理工作。

yarn找一個合適的服務器來啟動appmaster,並把job.split、jar、xml交給它。

7個實例全面掌握Hadoop MapReduce

(3)啟動maptask

Appmaster啟動後,根據固化文件job.split中的分片信息啟動maptask,一個分片對應一個maptask。

分配maptask時,會盡量讓maptask在目標數據所在的datanode上執行。

7個實例全面掌握Hadoop MapReduce

(4)執行maptask

Maptask會一行行地讀目標文件,交給我們寫的map程序,讀一行就調一次map方法,map調用context.write把處理結果寫出去,保存到本機的一個結果文件,這個文件中的內容是分區且有序的。

分區的作用就是定義哪些key在一組,一個分區對應一個reducer。

7個實例全面掌握Hadoop MapReduce

(5)啟動reducetask

Maptask都運行完成後,appmaster再啟動reducetask,maptask的結果中有幾個分區就啟動幾個reducetask。

7個實例全面掌握Hadoop MapReduce

(6)執行reducetask

reducetask去讀取maptask的結果文件中自己對應的那個分區數據,例如reducetask_01去讀第一個分區中的數據。

reducetask把讀到的數據按key組織好,傳給reduce方法進行處理,處理結果寫到指定的輸出路徑。

7個實例全面掌握Hadoop MapReduce

四、實例1:自定義對象序列化

1、需求與實現思路

(1)需求

需要統計手機用戶流量日誌,日誌內容實例:

7個實例全面掌握Hadoop MapReduce

要把同一個用戶的上行流量、下行流量進行累加,並計算出綜合。

例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:

13897230503,500,1600,2100

(2)實現思路

  • map

接收日誌的一行數據,key為行的偏移量,value為此行數據。

輸出時,應以手機號為key,value應為一個整體,包括:上行流量、下行流量、總流量。

手機號是字符串類型Text,而這個整體不能用基本數據類型表示,需要我們自定義一個bean對象,並且要實現可序列化。

key: 13897230503

value: < upFlow:100, dFlow:300, sumFlow:400 >

  • reduce

接收一個手機號標識的key,及這個手機號對應的bean對象集合。

例如:

key:

13897230503

value:

< upFlow:400, dFlow:1300, sumFlow:1700 >,

< upFlow:100, dFlow:300, sumFlow:400 >

迭代bean對象集合,累加各項,形成一個新的bean對象,例如:

< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >

最後輸出:

key: 13897230503

value: < upFlow:500, dFlow:1600, sumFlow:2100 >

2、代碼實踐

(1)創建項目

新建項目目錄serializebean,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

自定義bean:src/main/java/FlowBean

7個實例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/FlowCount

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構:

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器,然後下載測試數據文件。

上傳到HDFS

hdfs dfs -mkdir -p /flowcount/input

hdfs dfs -put flowdata.log /flowcount/input

運行

hadoop jar mapreduce-serializebean-0.0.1-SNAPSHOT.jar FlowCount

/flowcount/input /flowcount/output2

檢查

hdfs dfs -cat /flowcount/output/*

五、實例2:自定義分區

1、需求與實現思路

(1)需求

還是以上個例子的手機用戶流量日誌為例:

7個實例全面掌握Hadoop MapReduce

在上個例子的統計需要基礎上添加一個新需求:按省份統計,不同省份的手機號放到不同的文件裡。

例如137表示屬於河北,138屬於河南,那麼在結果輸出時,他們分別在不同的文件中。

(2)實現思路

map和reduce的處理思路與上例相同,這裡需要多做2步:

  • 自定義一個分區器Partitioner

根據手機號判斷屬於哪個分區。有幾個分區就有幾個reducetask,每個reducetask輸出一個文件,那麼,不同分區中的數據就寫入了不同的結果文件中。

7個實例全面掌握Hadoop MapReduce

  • 在main程序中指定使用我們自定義的Partitioner即可

2、代碼實踐

(1)創建項目

新建項目目錄custom_partion,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

自定義bean:src/main/java/FlowBean.java

7個實例全面掌握Hadoop MapReduce

自定義分區器:src/main/java/ProvincePartitioner.java

7個實例全面掌握Hadoop MapReduce

這段代碼是本示例的重點,其中定義了一個hashmap,假設其是一個數據庫,定義了手機號和分區的關係。

getPartition取得手機號的前綴,到數據庫中獲取區號,如果沒在數據庫中,就指定其為“其它分區”(用4代表)

MapReduce程序:src/main/java/FlowCount.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

main程序中指定了使用自定義的分區器

job.setPartitionerClass(ProvincePartitioner.class);

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器

運行

hadoop jar mapreduce-custompartion-0.0.1-SNAPSHOT.jar FlowCount

/flowcount/input /flowcount/output-part

檢查

hdfs dfs -ls /flowcount/output-part

六、實例3:計算出每組訂單中金額最大的記錄

1、需求與實現思路

(1)需求

有如下訂單數據:

7個實例全面掌握Hadoop MapReduce

需要求出每一個訂單中成交金額最大的一筆交易。

(2)實現思路

先介紹一個概念GroupingComparator組比較器,通過WordCount來理解它的作用。

WordCount中map處理完成後的結果數據是這樣的:

<good,1>

<good,1>

<good,1>

<is,1>

<is,1>

Reducer會把這些數據都讀進來,然後進行分組,把key相同的放在一組,形成這樣的形式:

<good, [1,1,1]>

<is, [1,1]>

然後對每一組數據調用一次reduce( key, Iterable, ...)方法。

其中分組的操作就需要用到GroupingComparator,對key進行比較,相同的放在一組。

注:上例中的Partitioner是屬於mapDuang的,GroupingComparator是屬於reduce端的。

下面看整體實現思路。

1)定義一個訂單bean

屬性包括:訂單號、金額

{ itemid, amount }

要實現可序列化,與比較方法compareTo,比較規則:訂單號不同的,按照訂單好比較,相同的,按照金額比較。

2)定義一個Partitioner

根據訂單號的hashcode分區,可以保證訂單號相同的在同一個分區,以便reduce中接收到同一個訂單的全部記錄。

同分區的數據是序的,這就用到了bean中的比較方法,可以讓訂單號相同的記錄按照金額從大到小排序。

在map方法中輸出數據時,key就是bean,value為null。

map的結果數據形式例如:

7個實例全面掌握Hadoop MapReduce

3)定義一個GroupingComparator

因為map的結果數據中key是bean,不是普通數據類型,所以需要使用自定義的比較器來分組,就使用bean中的訂單號來比較。

例如讀取到分區1的數據:

<{ Order_0000001 222.8 }, null>,

<{ Order_0000001 25.8 }, null>,

<{ Order_0000003 222.8 }, null>

進行比較,前兩條數據的訂單號相同,放入一組,默認是以第一條記錄的key作為這組記錄的key。

分組後的形式如下:

<{ Order_0000001 222.8 }, [null, null]>,

<{ Order_0000003 222.8 }, [null]>

在reduce方法中收到的每組記錄的key就是我們最終想要的結果,所以直接輸出到文件就可以了。

7個實例全面掌握Hadoop MapReduce

2、代碼實踐

(1)創建項目

新建項目目錄groupcomparator,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

**自定義bean:** src/main/java/OrderBean.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

自定義分區器:src/main/java/ItemIdPartitioner.java

7個實例全面掌握Hadoop MapReduce

自定義比較器:src/main/java/MyGroupingComparator.java

7個實例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/GroupSort.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器

下載測試數據文件

上傳到HDFS

hdfs dfs -put orders.txt /

運行

hadoop jar mapreduce-groupcomparator-0.0.1-SNAPSHOT.jar GroupSo

rt /orders.txt /outputOrders

檢查

hdfs dfs -ls /outputOrders

hdfs dfs -cat /outputOrders/*

七、實例4:合併多個小文件

1、需求與實現思路

(1)需求

要計算的目標文件中有大量的小文件,會造成分配任務和資源的開銷比實際的計算開銷還打,這就產生了效率損耗。

需要先把一些小文件合併成一個大文件。

(2)實現思路

文件的讀取由map負責,在前面的示意圖中可以看到一個inputformat用來讀取文件,然後以key value形式傳遞給map方法。

我們要自定義文件的讀取過程,就需要了解其細節流程:

7個實例全面掌握Hadoop MapReduce

所以我們需要自定義一個inputformat和RecordReader。

Inputformat使用我們自己的RecordReader,RecordReader負責實現一次讀取一個完整文件封裝為key value。

map接收到文件內容,然後以文件名為key,以文件內容為value,向外輸出的格式要注意,要使用SequenceFileOutPutFormat(用來輸出對象)。

因為reduce收到的key value都是對象,不是普通的文本,reduce默認的輸出格式是TextOutputFormat,使用它的話,最終輸出的內容就是對象ID,所以要使用SequenceFileOutPutFormat進行輸出。

2、代碼實踐

(1)創建項目inputformat,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

自定義inputform:src/main/java/MyInputFormat.java

7個實例全面掌握Hadoop MapReduce

createRecordReader方法中創建一個自定義的reader

自定義reader:src/main/java/MyRecordReader.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

其中有3個核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

nextKeyValue負責生成要傳遞給map方法的key和value。getCurrentKey、getCurrentValue是實際獲取key和value的。所以RecordReader的核心機制就是:通過nextKeyValue生成key value,然後通過getCurrentKey和getCurrentValue來返回上面構造好的key value。這裡的nextKeyValue負責把整個文件內容作為value。

7個實例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/ManyToOne.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

main程序中指定使用我們自定義的MyInputFormat,輸出使用SequenceFileOutputFormat。

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器。

準備測試文件,把Hadoop目錄中的配置文件上傳到HDFS

hdfs dfs -mkdir /files

hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /files

運行

hadoop jar mapreduce-inputformat-0.0.1-SNAPSHOT.jar ManyToOne /

files /onefile

檢查

hdfs dfs -ls /onefile

八、實例5:分組輸出到多個文件

1、需求與實現思路

(1)需求

7個實例全面掌握Hadoop MapReduce

需要把相同訂單id的記錄放在一個文件中,並以訂單id命名。

(2)實現思路

這個需求可以直接使用MultipleOutputs這個類來實現。

默認情況下,每個reducer寫入一個文件,文件名由分區號命名,例如'part-r-00000',而 MultipleOutputs可以用key作為文件名,例如‘Order_0000001-r-00000’。

所以,思路就是map中處理每條記錄,以‘訂單id’為key,reduce中使用MultipleOutputs進行輸出,會自動以key為文件名,文件內容就是相同key的所有記錄。

例如‘Order_0000001-r-00000’的內容就是:

Order_0000001,Pdt_05,25.8

Order_0000001,Pdt_01,222.8

2、代碼實踐

(1)創建項目

新建項目目錄multioutput,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

MapReduce程序:src/main/java/MultipleOutputTest.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器

然後運行

hadoop jar mapreduce-multipleOutput-0.0.1-SNAPSHOT.jar Multiple

OutputTest /orders.txt /output-multi

檢查

hdfs dfs -ls /output-multi

九、MapReduce核心流程梳理

我們已經瞭解了MapReduce的大概流程:

(1)maptask從目標文件中讀取數據

(2)mapper的map方法處理每一條數據,輸出到文件中

(3)reducer讀取map的結果文件,進行分組,把每一組交給reduce方法進行處理,最後輸出到指定路徑。

7個實例全面掌握Hadoop MapReduce

這是最基本的流程,有助於快速理解MapReduce的工作方式。

通過上面的幾個示例,我們要經接觸了一些更深入的細節,例如mapper的inputform中還有RecordReader、reducer中還有GroupingComparator。

下面就看一下更加深入的處理流程。

1、Maptask中的處理流程

(1)讀文件流程

7個實例全面掌握Hadoop MapReduce

目標文件會被按照規劃文件進行切分,inputformat調用RecordReader讀取文件切片,RecordReader會生成key value對兒,傳遞給Mapper的mao方法。

(2)寫入結果文件的流程

從Mapper的map方法調用context.write之後,到形成結果數據文件這個過程是比較複雜的。

7個實例全面掌握Hadoop MapReduce

context.write不是直接寫入文件,而是把數據交給OutputCollector,OutputCollector把數據寫入‘環形緩衝區’。‘環形緩衝區’中的數據會進行排序。

因為緩衝區的大小是有限制的,所以每當快滿時(達到80%)就要把其中的數據寫出去,這個過程叫做數據溢出。

溢出到一個文件中,溢出過程會對這批數據進行分組、比較操作,然後吸入文件,所以溢出文件中的數據是分好區的,並且是有序的。每次溢出都會產生一個溢出數據文件,所以會有多個。

當map處理完全數據後,就會對各個溢出數據文件進行合併,每個文件中相同區的數據放在一起,並再次排序,最後得到一個整體的結果文件,其中是分區且有序的。

這樣就完成了map過程,讀數據過程和寫結果文件的過程聯合起來如下圖:

7個實例全面掌握Hadoop MapReduce

2、Reducetask的處理流程

7個實例全面掌握Hadoop MapReduce

reducetask去讀每個maptask產生的結果文件中自己所負責的分區數據,讀到自己本地。對多個數據文件進行合併排序,然後通過GroupingComparator進行分組,把相同key的數據放到一組。對每組數據調一次reduce方法,處理完成後寫入目標路徑文件。

3、整體流程

把map和reduce的過程聯合起來:

7個實例全面掌握Hadoop MapReduce

十、實例6:join操作

1、需求與實現思路

(1)需求

有2個數據文件:訂單數據、商品信息。

訂單數據表order

7個實例全面掌握Hadoop MapReduce

商品信息表product

7個實例全面掌握Hadoop MapReduce

需要用MapReduce程序來實現下面這個SQL查詢運算:

select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.c

ategory_id, p.price

from t_order o join t_product p on o.pid = p.id

(2)實現思路

SQL的執行結果是這樣的:

7個實例全面掌握Hadoop MapReduce

實際上就是給每條訂單記錄補充上商品表中的信息。

實現思路:

1)定義bean

把SQL執行結果中的各列封裝成一個bean對象,實現序列化。

bean中還要有一個另外的屬性flag,用來標識此對象的數據是訂單還是商品。

2)map處理

map會處理兩個文件中的數據,根據文件名可以知道當前這條數據是訂單還是商品。

對每條數據創建一個bean對象,設置對應的屬性,並標識flag(0代表order,1代表product)

以join的關聯項“productid”為key,bean為value進行輸出。

3)reduce處理

reduce方法接收到pid相同的一組bean對象。

遍歷bean對象集合,如果bean是訂單數據,就放入一個新的訂單集合中,如果是商品數據,就保存到一個商品bean中。然後遍歷那個新的訂單集合,使用商品bean的數據對每個訂單bean進行信息補全。

這樣就得到了完整的訂單及其商品信息。

2、代碼實踐

(1)創建項目

新建項目目錄jointest,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

**封裝bean:** src/main/java/InfoBean.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

MapReduce程序:src/main/java/JoinMR.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器

下載產品和訂單的測試數據文件

上傳到HDFS

hdfs dfs -mkdir -p /jointest/input

hdfs dfs -put order.txt /jointest/input

hdfs dfs -put product.txt /jointest/input

運行

hadoop jar joinmr.jar com.dys.mapreducetest.join.JoinMR /jointe

st/input /jointest/output

檢查

hdfs dfs -cat /jointest/output/*

十一、實例7:計算出用戶間的共同好友

1、需求與實現思路

(1)需求

下面是用戶的好友關係列表,每一行代表一個用戶和他的好友列表。

7個實例全面掌握Hadoop MapReduce

需要求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰。

例如從前2天記錄中可以看出,C、E是A、B的共同好友,最終的形式如下:

7個實例全面掌握Hadoop MapReduce

(2)實現思路

之前的示例中都是一個MapReduce計算出來的,這裡我們使用2個MapReduce來實現。

1)第1個MapReduce

  • map

找出每個用戶都是誰的好友,例如:

讀一行A:B,C,D,F,E,O(A的好友有這些,反過來拆開,這些人中的每一個都是A的好友)

輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>

再讀一行B:A,C,E,K

輸出<A,B> <C,B> <E,B> <K,B>

……

  • reduce

key相同的會分到一組,例如:

<C,A><C,B><C,E><C,F><C,G>......

Key:C

value: [ A, B, E, F, G ]

意義是:C是這些用戶的好友。

遍歷value就可以得到:

A B 有共同好友C

A E 有共同好友C

...

B E有共同好友 C

B F有共同好友 C

輸出:

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>

.....

2)第2個MapReduce

對上一步的輸出結果進行計算。

  • map

讀出上一步的結果數據,組織成key value直接輸出

例如:

讀入一行<A-B,C>

直接輸出<A-B,C>

  • reduce

讀入數據,key相同的在一組

<A-B,C><A-B,F><A-B,G>......

輸出:

A-B C,F,G,.....

這樣就得出了兩個用戶間的共同好友列表

2、代碼實踐

(1)創建項目

新建項目目錄jointest,其中新建文件pom.xml,內容:

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

然後創建源碼目錄src/main/java

現在項目目錄的文件結構

7個實例全面掌握Hadoop MapReduce

(2)代碼

第一步的MapReduce程序:src/main/java/StepFirst.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

第二步的MapReduce程序:src/main/java/StepSecond.java

7個實例全面掌握Hadoop MapReduce

7個實例全面掌握Hadoop MapReduce

(3)編譯打包

在pom.xml所在目錄下執行打包命令:

mvn package

執行完成後,會自動生成target目錄,其中有打包好的jar文件。

現在項目文件結構

7個實例全面掌握Hadoop MapReduce

(4)運行

先把target中的jar上傳到Hadoop服務器

下載測試數據文件

上傳到HDFS

hdfs dfs -mkdir -p /friends/input

hdfs dfs -put friendsdata.txt /friends/input

運行第一步

hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepFirst /frie

nds/input/friendsdata.txt /friends/output01

運行第二步

hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepSecond /fri

ends/output01/part-r-00000 /friends/output02

查看結果

hdfs dfs -ls /friends/output02hdfs dfs -cat /friends/output02/*

7個實例全面掌握Hadoop MapReduce

十二、小結

MapReduce的基礎內容介紹完了,希望可以幫助您快速熟悉MapReduce的工作原理和開發方法。如有批評與建議(例如內容有誤、不足的地方、改進建議等),歡迎留言討論。

End.

文章轉載自:DBAplus社群

中國統計網,是國內最早的大數據學習網站,歡迎關注!

7個實例全面掌握Hadoop MapReduce

相關推薦

推薦中...