作者介紹
杜亦舒,創業中,技術合夥人,喜歡研究分享技術。個人訂閱號:性能與架構。
本文旨在幫您快速瞭解 MapReduce 的工作機制和開發方法,解決以下幾個問題:
MapReduce 基本原理是什麼?
MapReduce 的執行過程是怎麼樣的?
MapReduce 的核心流程細節
如何進行 MapReduce 程序開發?(通過7個實例逐漸掌握)
文章中提供了程序實例中涉及到的測試數據文件,可以直接下載使用。
關於實踐環境,如果您不喜歡自己搭建Hadoop環境,可以下載使用本教程提供的環境,實踐部分內容中會介紹具體使用方法。
通過學習並實踐完成後,可以對 MapReduce 工作原理有比較清晰的認識,並掌握 MapReduce 的編程思路。
大綱:
一、MapReduce 基本原理
二、MapReduce 入門示例 - WordCount 單詞統計
三、MapReduce 執行過程分析
實例1 - 自定義對象序列化
實例2 - 自定義分區
實例3 - 計算出每組訂單中金額最大的記錄
實例4 - 合併多個小文件
實例5 - 分組輸出到多個文件
四、MapReduce 核心流程梳理
實例6 - join 操作
實例7 - 計算出用戶間的共同好友
五、下載方式
一、MapReduce基本原理
MapReduce是一種編程模型,用於大規模數據集的分佈式運算。
1、MapReduce通俗解釋
圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。
張同學統計 書架1
王同學統計 書架2
劉同學統計 書架3
……
過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。
這個過程就可以理解為MapReduce的工作過程。
2、MapReduce中有兩個核心操作
(1)map
管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。
(2)reduce
每個同學的結果進行彙總,這個過程是reduce。
3、MapReduce工作過程拆解
下面通過一個景點案例(單詞統計)看MapReduce是如何工作的。
有一個文本文件,被分成了4份,分別放到了4臺服務器中存儲
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
現在要統計出每個單詞的出現次數。
處理過程
(1)拆分單詞
map節點1
輸入:“the weather is good”
輸出:(the,1),(weather,1),(is,1),(good,1)
map節點2
輸入:“today is good”
輸出:(today,1),(is,1),(good,1)
map節點3
輸入:“good weather is good”
輸出:(good,1),(weather,1),(is,1),(good,1)
map節點4
輸入:“today has good weather”
輸出:(today,1),(has,1),(good,1),(weather,1)
(2)排序
map節點1
map節點2
map節點3
map節點4
(3)合併
map節點1
map節點2
map節點3
map節點4
(4)彙總統計
每個map節點都完成以後,就要進入reduce階段了。
例如使用了3個reduce節點,需要對上面4個map節點的結果進行重新組合,比如按照26個字母分成3段,分配給3個reduce節點。
Reduce節點進行統計,計算出最終結果。
這就是最基本的MapReduce處理流程。
4、MapReduce編程思路
瞭解了MapReduce的工作過程,我們思考一下用代碼實現時需要做哪些工作?
在4個服務器中啟動4個map任務
每個map任務讀取目標文件,每讀一行就拆分一下單詞,並記下來次單詞出現了一次
目標文件的每一行都處理完成後,需要把單詞進行排序
在3個服務器上啟動reduce任務
每個reduce獲取一部分map的處理結果
reduce任務進行彙總統計,輸出最終的結果數據
但不用擔心,MapReduce是一個非常優秀的編程模型,已經把絕大多數的工作做完了,我們只需要關心2個部分:
map處理邏輯——對傳進來的一行數據如何處理?輸出什麼信息?
reduce處理邏輯——對傳進來的map處理結果如何處理?輸出什麼信息?
編寫好這兩個核心業務邏輯之後,只需要幾行簡單的代碼把map和reduce裝配成一個job,然後提交給Hadoop集群就可以了。
至於其它的複雜細節,例如如何啟動map任務和reduce任務、如何讀取文件、如對map結果排序、如何把map結果數據分配給reduce、reduce如何把最終結果保存到文件等等,MapReduce框架都幫我們做好了,而且還支持很多自定義擴展配置,例如如何讀文件、如何組織map或者reduce的輸出結果等等,後面的示例中會有介紹。
二、MapReduce入門示例:WordCount單詞統計
WordCount是非常好的入門示例,相當於helloword,下面就開發一個WordCount的MapReduce程序,體驗實際開發方式。
1、安裝Hadoop實踐環境
您可以選擇自己搭建環境,也可以使用打包好的Hadoop環境(版本2.7.3)。
這個Hadoop環境實際上是一個虛機鏡像,所以需要安裝virtualbox虛擬機、vagrant鏡像管理工具,和我的Hadoop鏡像,然後用這個鏡像啟動虛機就可以了,下面是具體操作步驟:
(1)安裝virtualbox
(2)安裝vagrant
(3)下載Hadoop鏡像
(4)啟動
加載Hadoop鏡像
vagrant box add {自定義鏡像名稱} {鏡像所在路徑}
例如您想命名為Hadoop,鏡像下載後的路徑為d:\hadoop.box,加載命令就是這樣:
vagrant box add hadoop d:\hadoop.box
創建工作目錄,例如d:\hdfstest。
進入此目錄,初始化
cd d:\hdfstest
vagrant init hadoop
啟動虛機
vagrant up
啟動完成後,就可以使用SSH客戶端登錄虛機了
IP 127.0.0.1
端口 2222
用戶名 root
密碼 vagrant
在Hadoop服務器中啟動HDFS和Yarn,之後就可以運行MapReduce程序了
start-dfs.sh
start-yarn.sh
2、創建項目
注:流程是在本機開發,然後打包,上傳到Hadoop服務器上運行。
新建項目目錄wordcount,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在的目錄結構
3、代碼
mapper程序:src/main/java/WordcountMapper.java
內容:
這裡定義了一個mapper類,其中有一個map方法。MapReduce框架每讀到一行數據,就會調用一次這個map方法。
map的處理流程就是接收一個key value對兒,然後進行業務邏輯處理,最後輸出一個key value對兒。
Mapper<LongWritable, Text, Text, IntWritable>
其中的4個類型分別是:輸入key類型、輸入value類型、輸出key類型、輸出value類型。
MapReduce框架讀到一行數據侯以key value形式傳進來,key默認情況下是mr礦機所讀到一行文本的起始偏移量(Long類型),value默認情況下是mr框架所讀到的一行的數據內容(String類型)。
輸出也是key value形式的,是用戶自定義邏輯處理完成後定義的key,用戶自己決定用什麼作為key,value是用戶自定義邏輯處理完成後的value,內容和類型也是用戶自己決定。
此例中,輸出key就是word(字符串類型),輸出value就是單詞數量(整型)。
這裡的數據類型和我們常用的不一樣,因為MapReduce程序的輸出數據需要在不同機器間傳輸,所以必須是可序列化的,例如Long類型,Hadoop中定義了自己的可序列化類型LongWritable,String對應的是Text,int對應的是IntWritable。
reduce程序:src/main/java/WordCountReducer.java
這裡定義了一個Reducer類和一個reduce方法。
當傳給reduce方法時,就變為:
Reducer<Text, IntWritable, Text, IntWritable>
4個類型分別指:輸入key的類型、輸入value的類型、輸出key的類型、輸出value的類型。
需要注意,reduce方法接收的是:一個字符串類型的key、一個可迭代的數據集。因為reduce任務讀取到map任務處理結果是這樣的:
(good,1)(good,1)(good,1)(good,1)
當傳給reduce方法時,就變為:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一個key的一組value。
主程序:src/main/java/WordCountMapReduce.java
這個main方法就是用來組裝一個job並提交執行
4、編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構:
5、運行
先把target中的jar上傳到Hadoop服務器,然後在Hadoop服務器的HDFS中準備測試文件(把Hadoop所在目錄下的txt文件都上傳到HDFS)
cd $HADOOP_HOME
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put *.txt /wordcount/input
執行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR
educe /wordcount/input /wordcount/output
執行完成後驗證
hdfs dfs -cat /wordcount/output/*
可以看到單詞數量統計結果。
三、MapReduce執行過程分析
下面看一下從job提交到執行完成這個過程是怎樣。
(1)客戶端提交任務
Client提交任務時會先到HDFS中查看目標文件的大小,瞭解要獲取的數據的規模,然後形成任務分配的規劃,例如:
a.txt 0-128M交給一個task,128-256M 交給一個task,b.txt 0-128M交給一個task,128-256M交給一個task ...,形成規劃文件job.split。
然後把規劃文件job.split、jar、配置文件xml提交給yarn(Hadoop集群資源管理器,負責為任務分配合適的服務器資源)
(2)啟動appmaster
注:appmaster是本次job的主管,負責maptask和reducetask的啟動、監控、協調管理工作。
yarn找一個合適的服務器來啟動appmaster,並把job.split、jar、xml交給它。
(3)啟動maptask
Appmaster啟動後,根據固化文件job.split中的分片信息啟動maptask,一個分片對應一個maptask。
分配maptask時,會盡量讓maptask在目標數據所在的datanode上執行。
(4)執行maptask
Maptask會一行行地讀目標文件,交給我們寫的map程序,讀一行就調一次map方法,map調用context.write把處理結果寫出去,保存到本機的一個結果文件,這個文件中的內容是分區且有序的。
分區的作用就是定義哪些key在一組,一個分區對應一個reducer。
(5)啟動reducetask
Maptask都運行完成後,appmaster再啟動reducetask,maptask的結果中有幾個分區就啟動幾個reducetask。
(6)執行reducetask
reducetask去讀取maptask的結果文件中自己對應的那個分區數據,例如reducetask_01去讀第一個分區中的數據。
reducetask把讀到的數據按key組織好,傳給reduce方法進行處理,處理結果寫到指定的輸出路徑。
四、實例1:自定義對象序列化
1、需求與實現思路
(1)需求
需要統計手機用戶流量日誌,日誌內容實例:
要把同一個用戶的上行流量、下行流量進行累加,並計算出綜合。
例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:
13897230503,500,1600,2100
(2)實現思路
map
接收日誌的一行數據,key為行的偏移量,value為此行數據。
輸出時,應以手機號為key,value應為一個整體,包括:上行流量、下行流量、總流量。
手機號是字符串類型Text,而這個整體不能用基本數據類型表示,需要我們自定義一個bean對象,並且要實現可序列化。
key: 13897230503
value: < upFlow:100, dFlow:300, sumFlow:400 >
reduce
接收一個手機號標識的key,及這個手機號對應的bean對象集合。
例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean對象集合,累加各項,形成一個新的bean對象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最後輸出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >
2、代碼實踐
(1)創建項目
新建項目目錄serializebean,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
自定義bean:src/main/java/FlowBean
MapReduce程序:src/main/java/FlowCount
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構:
(4)運行
先把target中的jar上傳到Hadoop服務器,然後下載測試數據文件。
上傳到HDFS
hdfs dfs -mkdir -p /flowcount/input
hdfs dfs -put flowdata.log /flowcount/input
運行
hadoop jar mapreduce-serializebean-0.0.1-SNAPSHOT.jar FlowCount
/flowcount/input /flowcount/output2
檢查
hdfs dfs -cat /flowcount/output/*
五、實例2:自定義分區
1、需求與實現思路
(1)需求
還是以上個例子的手機用戶流量日誌為例:
在上個例子的統計需要基礎上添加一個新需求:按省份統計,不同省份的手機號放到不同的文件裡。
例如137表示屬於河北,138屬於河南,那麼在結果輸出時,他們分別在不同的文件中。
(2)實現思路
map和reduce的處理思路與上例相同,這裡需要多做2步:
自定義一個分區器Partitioner
根據手機號判斷屬於哪個分區。有幾個分區就有幾個reducetask,每個reducetask輸出一個文件,那麼,不同分區中的數據就寫入了不同的結果文件中。
在main程序中指定使用我們自定義的Partitioner即可
2、代碼實踐
(1)創建項目
新建項目目錄custom_partion,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
自定義bean:src/main/java/FlowBean.java
自定義分區器:src/main/java/ProvincePartitioner.java
這段代碼是本示例的重點,其中定義了一個hashmap,假設其是一個數據庫,定義了手機號和分區的關係。
getPartition取得手機號的前綴,到數據庫中獲取區號,如果沒在數據庫中,就指定其為“其它分區”(用4代表)
MapReduce程序:src/main/java/FlowCount.java
main程序中指定了使用自定義的分區器
job.setPartitionerClass(ProvincePartitioner.class);
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器
運行
hadoop jar mapreduce-custompartion-0.0.1-SNAPSHOT.jar FlowCount
/flowcount/input /flowcount/output-part
檢查
hdfs dfs -ls /flowcount/output-part
六、實例3:計算出每組訂單中金額最大的記錄
1、需求與實現思路
(1)需求
有如下訂單數據:
需要求出每一個訂單中成交金額最大的一筆交易。
(2)實現思路
先介紹一個概念GroupingComparator組比較器,通過WordCount來理解它的作用。
WordCount中map處理完成後的結果數據是這樣的:
<good,1>
<good,1>
<good,1>
<is,1>
<is,1>
Reducer會把這些數據都讀進來,然後進行分組,把key相同的放在一組,形成這樣的形式:
<good, [1,1,1]>
<is, [1,1]>
然後對每一組數據調用一次reduce( key, Iterable, ...)方法。
其中分組的操作就需要用到GroupingComparator,對key進行比較,相同的放在一組。
注:上例中的Partitioner是屬於mapDuang的,GroupingComparator是屬於reduce端的。
下面看整體實現思路。
1)定義一個訂單bean
屬性包括:訂單號、金額
{ itemid, amount }
要實現可序列化,與比較方法compareTo,比較規則:訂單號不同的,按照訂單好比較,相同的,按照金額比較。
2)定義一個Partitioner
根據訂單號的hashcode分區,可以保證訂單號相同的在同一個分區,以便reduce中接收到同一個訂單的全部記錄。
同分區的數據是序的,這就用到了bean中的比較方法,可以讓訂單號相同的記錄按照金額從大到小排序。
在map方法中輸出數據時,key就是bean,value為null。
map的結果數據形式例如:
3)定義一個GroupingComparator
因為map的結果數據中key是bean,不是普通數據類型,所以需要使用自定義的比較器來分組,就使用bean中的訂單號來比較。
例如讀取到分區1的數據:
<{ Order_0000001 222.8 }, null>,
<{ Order_0000001 25.8 }, null>,
<{ Order_0000003 222.8 }, null>
進行比較,前兩條數據的訂單號相同,放入一組,默認是以第一條記錄的key作為這組記錄的key。
分組後的形式如下:
<{ Order_0000001 222.8 }, [null, null]>,
<{ Order_0000003 222.8 }, [null]>
在reduce方法中收到的每組記錄的key就是我們最終想要的結果,所以直接輸出到文件就可以了。
2、代碼實踐
(1)創建項目
新建項目目錄groupcomparator,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
**自定義bean:** src/main/java/OrderBean.java
自定義分區器:src/main/java/ItemIdPartitioner.java
自定義比較器:src/main/java/MyGroupingComparator.java
MapReduce程序:src/main/java/GroupSort.java
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器
下載測試數據文件
上傳到HDFS
hdfs dfs -put orders.txt /
運行
hadoop jar mapreduce-groupcomparator-0.0.1-SNAPSHOT.jar GroupSo
rt /orders.txt /outputOrders
檢查
hdfs dfs -ls /outputOrders
hdfs dfs -cat /outputOrders/*
七、實例4:合併多個小文件
1、需求與實現思路
(1)需求
要計算的目標文件中有大量的小文件,會造成分配任務和資源的開銷比實際的計算開銷還打,這就產生了效率損耗。
需要先把一些小文件合併成一個大文件。
(2)實現思路
文件的讀取由map負責,在前面的示意圖中可以看到一個inputformat用來讀取文件,然後以key value形式傳遞給map方法。
我們要自定義文件的讀取過程,就需要了解其細節流程:
所以我們需要自定義一個inputformat和RecordReader。
Inputformat使用我們自己的RecordReader,RecordReader負責實現一次讀取一個完整文件封裝為key value。
map接收到文件內容,然後以文件名為key,以文件內容為value,向外輸出的格式要注意,要使用SequenceFileOutPutFormat(用來輸出對象)。
因為reduce收到的key value都是對象,不是普通的文本,reduce默認的輸出格式是TextOutputFormat,使用它的話,最終輸出的內容就是對象ID,所以要使用SequenceFileOutPutFormat進行輸出。
2、代碼實踐
(1)創建項目inputformat,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄文件結構
(2)代碼
自定義inputform:src/main/java/MyInputFormat.java
createRecordReader方法中創建一個自定義的reader
自定義reader:src/main/java/MyRecordReader.java
其中有3個核心方法:nextKeyValue、getCurrentKey、getCurrentValue。
nextKeyValue負責生成要傳遞給map方法的key和value。getCurrentKey、getCurrentValue是實際獲取key和value的。所以RecordReader的核心機制就是:通過nextKeyValue生成key value,然後通過getCurrentKey和getCurrentValue來返回上面構造好的key value。這裡的nextKeyValue負責把整個文件內容作為value。
MapReduce程序:src/main/java/ManyToOne.java
main程序中指定使用我們自定義的MyInputFormat,輸出使用SequenceFileOutputFormat。
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器。
準備測試文件,把Hadoop目錄中的配置文件上傳到HDFS
hdfs dfs -mkdir /files
hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /files
運行
hadoop jar mapreduce-inputformat-0.0.1-SNAPSHOT.jar ManyToOne /
files /onefile
檢查
hdfs dfs -ls /onefile
八、實例5:分組輸出到多個文件
1、需求與實現思路
(1)需求
需要把相同訂單id的記錄放在一個文件中,並以訂單id命名。
(2)實現思路
這個需求可以直接使用MultipleOutputs這個類來實現。
默認情況下,每個reducer寫入一個文件,文件名由分區號命名,例如'part-r-00000',而 MultipleOutputs可以用key作為文件名,例如‘Order_0000001-r-00000’。
所以,思路就是map中處理每條記錄,以‘訂單id’為key,reduce中使用MultipleOutputs進行輸出,會自動以key為文件名,文件內容就是相同key的所有記錄。
例如‘Order_0000001-r-00000’的內容就是:
Order_0000001,Pdt_05,25.8
Order_0000001,Pdt_01,222.8
2、代碼實踐
(1)創建項目
新建項目目錄multioutput,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
MapReduce程序:src/main/java/MultipleOutputTest.java
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器
然後運行
hadoop jar mapreduce-multipleOutput-0.0.1-SNAPSHOT.jar Multiple
OutputTest /orders.txt /output-multi
檢查
hdfs dfs -ls /output-multi
九、MapReduce核心流程梳理
我們已經瞭解了MapReduce的大概流程:
(1)maptask從目標文件中讀取數據
(2)mapper的map方法處理每一條數據,輸出到文件中
(3)reducer讀取map的結果文件,進行分組,把每一組交給reduce方法進行處理,最後輸出到指定路徑。
這是最基本的流程,有助於快速理解MapReduce的工作方式。
通過上面的幾個示例,我們要經接觸了一些更深入的細節,例如mapper的inputform中還有RecordReader、reducer中還有GroupingComparator。
下面就看一下更加深入的處理流程。
1、Maptask中的處理流程
(1)讀文件流程
目標文件會被按照規劃文件進行切分,inputformat調用RecordReader讀取文件切片,RecordReader會生成key value對兒,傳遞給Mapper的mao方法。
(2)寫入結果文件的流程
從Mapper的map方法調用context.write之後,到形成結果數據文件這個過程是比較複雜的。
context.write不是直接寫入文件,而是把數據交給OutputCollector,OutputCollector把數據寫入‘環形緩衝區’。‘環形緩衝區’中的數據會進行排序。
因為緩衝區的大小是有限制的,所以每當快滿時(達到80%)就要把其中的數據寫出去,這個過程叫做數據溢出。
溢出到一個文件中,溢出過程會對這批數據進行分組、比較操作,然後吸入文件,所以溢出文件中的數據是分好區的,並且是有序的。每次溢出都會產生一個溢出數據文件,所以會有多個。
當map處理完全數據後,就會對各個溢出數據文件進行合併,每個文件中相同區的數據放在一起,並再次排序,最後得到一個整體的結果文件,其中是分區且有序的。
這樣就完成了map過程,讀數據過程和寫結果文件的過程聯合起來如下圖:
2、Reducetask的處理流程
reducetask去讀每個maptask產生的結果文件中自己所負責的分區數據,讀到自己本地。對多個數據文件進行合併排序,然後通過GroupingComparator進行分組,把相同key的數據放到一組。對每組數據調一次reduce方法,處理完成後寫入目標路徑文件。
3、整體流程
把map和reduce的過程聯合起來:
十、實例6:join操作
1、需求與實現思路
(1)需求
有2個數據文件:訂單數據、商品信息。
訂單數據表order
商品信息表product
需要用MapReduce程序來實現下面這個SQL查詢運算:
select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.c
ategory_id, p.price
from t_order o join t_product p on o.pid = p.id
(2)實現思路
SQL的執行結果是這樣的:
實際上就是給每條訂單記錄補充上商品表中的信息。
實現思路:
1)定義bean
把SQL執行結果中的各列封裝成一個bean對象,實現序列化。
bean中還要有一個另外的屬性flag,用來標識此對象的數據是訂單還是商品。
2)map處理
map會處理兩個文件中的數據,根據文件名可以知道當前這條數據是訂單還是商品。
對每條數據創建一個bean對象,設置對應的屬性,並標識flag(0代表order,1代表product)
以join的關聯項“productid”為key,bean為value進行輸出。
3)reduce處理
reduce方法接收到pid相同的一組bean對象。
遍歷bean對象集合,如果bean是訂單數據,就放入一個新的訂單集合中,如果是商品數據,就保存到一個商品bean中。然後遍歷那個新的訂單集合,使用商品bean的數據對每個訂單bean進行信息補全。
這樣就得到了完整的訂單及其商品信息。
2、代碼實踐
(1)創建項目
新建項目目錄jointest,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
**封裝bean:** src/main/java/InfoBean.java
MapReduce程序:src/main/java/JoinMR.java
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器
下載產品和訂單的測試數據文件
上傳到HDFS
hdfs dfs -mkdir -p /jointest/input
hdfs dfs -put order.txt /jointest/input
hdfs dfs -put product.txt /jointest/input
運行
hadoop jar joinmr.jar com.dys.mapreducetest.join.JoinMR /jointe
st/input /jointest/output
檢查
hdfs dfs -cat /jointest/output/*
十一、實例7:計算出用戶間的共同好友
1、需求與實現思路
(1)需求
下面是用戶的好友關係列表,每一行代表一個用戶和他的好友列表。
需要求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰。
例如從前2天記錄中可以看出,C、E是A、B的共同好友,最終的形式如下:
(2)實現思路
之前的示例中都是一個MapReduce計算出來的,這裡我們使用2個MapReduce來實現。
1)第1個MapReduce
map
找出每個用戶都是誰的好友,例如:
讀一行A:B,C,D,F,E,O(A的好友有這些,反過來拆開,這些人中的每一個都是A的好友)
輸出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>
再讀一行B:A,C,E,K
輸出<A,B> <C,B> <E,B> <K,B>
……
reduce
key相同的會分到一組,例如:
<C,A><C,B><C,E><C,F><C,G>......
Key:C
value: [ A, B, E, F, G ]
意義是:C是這些用戶的好友。
遍歷value就可以得到:
A B 有共同好友C
A E 有共同好友C
...
B E有共同好友 C
B F有共同好友 C
輸出:
<A-B,C>
<A-E,C>
<A-F,C>
<A-G,C>
<B-E,C>
<B-F,C>
.....
2)第2個MapReduce
對上一步的輸出結果進行計算。
map
讀出上一步的結果數據,組織成key value直接輸出
例如:
讀入一行<A-B,C>
直接輸出<A-B,C>
reduce
讀入數據,key相同的在一組
<A-B,C><A-B,F><A-B,G>......
輸出:
A-B C,F,G,.....
這樣就得出了兩個用戶間的共同好友列表
2、代碼實踐
(1)創建項目
新建項目目錄jointest,其中新建文件pom.xml,內容:
然後創建源碼目錄src/main/java
現在項目目錄的文件結構
(2)代碼
第一步的MapReduce程序:src/main/java/StepFirst.java
第二步的MapReduce程序:src/main/java/StepSecond.java
(3)編譯打包
在pom.xml所在目錄下執行打包命令:
mvn package
執行完成後,會自動生成target目錄,其中有打包好的jar文件。
現在項目文件結構
(4)運行
先把target中的jar上傳到Hadoop服務器
下載測試數據文件
上傳到HDFS
hdfs dfs -mkdir -p /friends/input
hdfs dfs -put friendsdata.txt /friends/input
運行第一步
hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepFirst /frie
nds/input/friendsdata.txt /friends/output01
運行第二步
hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepSecond /fri
ends/output01/part-r-00000 /friends/output02
查看結果
hdfs dfs -ls /friends/output02hdfs dfs -cat /friends/output02/*
十二、小結
MapReduce的基礎內容介紹完了,希望可以幫助您快速熟悉MapReduce的工作原理和開發方法。如有批評與建議(例如內容有誤、不足的地方、改進建議等),歡迎留言討論。
End.
文章轉載自:DBAplus社群