'MapReduce 編程模型 & WordCount 示例'

MapReduce 大數據 Java 並行計算 HDFS Google 文章 達升笑講故事 2019-08-04
"


"


MapReduce 編程模型 & WordCount 示例


學習大數據接觸到的第一個編程思想 MapReduce。

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人瞭解到的就是數據量大。當數據量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,裡面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

然後從文件中讀取一行數據,然後對這行數據按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

然後再讀取一行數據重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鐘的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎麼做?還去加載到內存麼?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的集群,然後把這 2T 的文件切分成很多個小文件,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的內存不足的問題。

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

"


MapReduce 編程模型 & WordCount 示例


學習大數據接觸到的第一個編程思想 MapReduce。

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人瞭解到的就是數據量大。當數據量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,裡面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

然後從文件中讀取一行數據,然後對這行數據按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

然後再讀取一行數據重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鐘的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎麼做?還去加載到內存麼?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的集群,然後把這 2T 的文件切分成很多個小文件,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的內存不足的問題。

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

MapReduce 編程模型 & WordCount 示例


簡單說一下上面的思路:

第一步:把兩個T 的文件分成若干個文件塊(block)分散存在整個集群上,比如128M 一個。

第二步:在每臺機器上運行一個map task 任務,分別對自己機器上的文件進行統計:

1.先把數據加載進內存,然後一行一行的對數據進行讀取,按照空格來進行切割。

2.用一個 HashMap 來存儲數據,內容為 <單詞,數量>

3.當自己本地的數據處理完成以後,將數據進行輸出準備

4.輸出數據之前,先把HashMap 按照首字母範圍分成 3 個HashMap5.將3個 HashMap 分別發送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的數據,就會進入同一個 Reduce task 進行處理,保證數據統計的完整性。

第三步: Reduce task 把收到的數據進行彙總,然後輸出到 hdfs 文件系統進程存儲。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分佈式以後,會面臨非常多的複雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的數據任務?

•Map task 和 Reduce task 之間如何進行銜接,什麼時候去啟動Reduce Task 呀?

•如果 Map task 運行失敗了,怎麼處理?

•Map task 還要去維護自己要發送的數據分區,是不是也太麻煩了。

•等等等等等

為什麼要用 MapReduce

可見在程序由單機版擴成分佈式時,會引入大量的複雜工作。為了提高開發效率,可以將分佈式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分佈式程序的通用框架。

WordCount 示例

用一個代碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎麼關聯他們的關係呢。

首先是 map task :

package com.zhouq.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN 默認情況下,是MR 框架中讀取到的一行文本的起始偏移量,long 類型
* 在hadoop 中有自己更精簡的序列化接口,我們不直接用Long ,而是用 LongWritable
* VALUEIN : 默認情況下,是MR 中讀取到的一行文本內容,String ,也有自己的類型 Text 類型
* <p>
* KEYOUT : 是用戶自定義的邏輯處理完成後的自定義輸出數據的key ,我們這裡是單詞,類型為string 同上,Text
* <p>
* VALUEOUT: 是用戶自定義的邏輯處理完成後的自定義輸出value 類型,我們這裡是單詞數量Integer,同上,Integer 也有自己的類型 IntWritable
* <p>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 階段的業務邏輯就寫在map 方法內
* maptask 會對每一行輸入數據 就調用一次我們自定義的map 方法。
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到輸入的這行數據
String line = value.toString();
//根據空格進行分割得到這行的單詞
String[] words = line.split(" ");
//將單詞輸出為 <word,1>
for (String word : words) {
//將單詞作為key ,將次數 做為value輸出,
// 這樣也利於後面的數據分發,可以根據單詞進行分發,
// 以便於相同的單詞落到相同的reduce task 上,方便統計
context.write(new Text(word), new IntWritable(1));
}
}
}

接下來是 reduce task 邏輯:

/**
* KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT
* <p>
* KEYOUT :是自定義 reduce 邏輯處理結果的key
* VALUEOUT : 是自定義reduce 邏輯處理結果的 value
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <zhouq,1>,<zhouq,1>,<zhouq,2> ......
* 入參key 是一組單詞的kv對 的 key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//拿到當前傳送進來的 單詞
// String word = key.toString();
//
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//這裡的key 就是單詞
context.write(key, new IntWritable(count));
}
}

最後是啟動類:

/**
* wc 啟動類
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// mapreduce.framework.name 配置成 local 就是本地運行模式,默認就是local
// 所謂的集群運行模式 yarn ,就是提交程序到yarn 上. 要想集群運行必須指定下面三個配置.
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resoucemanager.hostname", "mini1");
//conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");
Job job = Job.getInstance(conf);
//指定本程序的jar 包 所在的本地路徑
job.setJarByClass(WordCountDriver.class);
//指定本次業務的mepper 和 reduce 業務類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReduce.class);
//指定mapper 輸出的 key value 類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定 最終輸出的 kv 類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job 輸出的文件目錄
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}

配置啟動類參數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

"


MapReduce 編程模型 & WordCount 示例


學習大數據接觸到的第一個編程思想 MapReduce。

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人瞭解到的就是數據量大。當數據量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,裡面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

然後從文件中讀取一行數據,然後對這行數據按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

然後再讀取一行數據重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鐘的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎麼做?還去加載到內存麼?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的集群,然後把這 2T 的文件切分成很多個小文件,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的內存不足的問題。

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

MapReduce 編程模型 & WordCount 示例


簡單說一下上面的思路:

第一步:把兩個T 的文件分成若干個文件塊(block)分散存在整個集群上,比如128M 一個。

第二步:在每臺機器上運行一個map task 任務,分別對自己機器上的文件進行統計:

1.先把數據加載進內存,然後一行一行的對數據進行讀取,按照空格來進行切割。

2.用一個 HashMap 來存儲數據,內容為 <單詞,數量>

3.當自己本地的數據處理完成以後,將數據進行輸出準備

4.輸出數據之前,先把HashMap 按照首字母範圍分成 3 個HashMap5.將3個 HashMap 分別發送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的數據,就會進入同一個 Reduce task 進行處理,保證數據統計的完整性。

第三步: Reduce task 把收到的數據進行彙總,然後輸出到 hdfs 文件系統進程存儲。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分佈式以後,會面臨非常多的複雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的數據任務?

•Map task 和 Reduce task 之間如何進行銜接,什麼時候去啟動Reduce Task 呀?

•如果 Map task 運行失敗了,怎麼處理?

•Map task 還要去維護自己要發送的數據分區,是不是也太麻煩了。

•等等等等等

為什麼要用 MapReduce

可見在程序由單機版擴成分佈式時,會引入大量的複雜工作。為了提高開發效率,可以將分佈式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分佈式程序的通用框架。

WordCount 示例

用一個代碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎麼關聯他們的關係呢。

首先是 map task :

package com.zhouq.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN 默認情況下,是MR 框架中讀取到的一行文本的起始偏移量,long 類型
* 在hadoop 中有自己更精簡的序列化接口,我們不直接用Long ,而是用 LongWritable
* VALUEIN : 默認情況下,是MR 中讀取到的一行文本內容,String ,也有自己的類型 Text 類型
* <p>
* KEYOUT : 是用戶自定義的邏輯處理完成後的自定義輸出數據的key ,我們這裡是單詞,類型為string 同上,Text
* <p>
* VALUEOUT: 是用戶自定義的邏輯處理完成後的自定義輸出value 類型,我們這裡是單詞數量Integer,同上,Integer 也有自己的類型 IntWritable
* <p>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 階段的業務邏輯就寫在map 方法內
* maptask 會對每一行輸入數據 就調用一次我們自定義的map 方法。
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到輸入的這行數據
String line = value.toString();
//根據空格進行分割得到這行的單詞
String[] words = line.split(" ");
//將單詞輸出為 <word,1>
for (String word : words) {
//將單詞作為key ,將次數 做為value輸出,
// 這樣也利於後面的數據分發,可以根據單詞進行分發,
// 以便於相同的單詞落到相同的reduce task 上,方便統計
context.write(new Text(word), new IntWritable(1));
}
}
}

接下來是 reduce task 邏輯:

/**
* KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT
* <p>
* KEYOUT :是自定義 reduce 邏輯處理結果的key
* VALUEOUT : 是自定義reduce 邏輯處理結果的 value
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <zhouq,1>,<zhouq,1>,<zhouq,2> ......
* 入參key 是一組單詞的kv對 的 key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//拿到當前傳送進來的 單詞
// String word = key.toString();
//
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//這裡的key 就是單詞
context.write(key, new IntWritable(count));
}
}

最後是啟動類:

/**
* wc 啟動類
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// mapreduce.framework.name 配置成 local 就是本地運行模式,默認就是local
// 所謂的集群運行模式 yarn ,就是提交程序到yarn 上. 要想集群運行必須指定下面三個配置.
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resoucemanager.hostname", "mini1");
//conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");
Job job = Job.getInstance(conf);
//指定本程序的jar 包 所在的本地路徑
job.setJarByClass(WordCountDriver.class);
//指定本次業務的mepper 和 reduce 業務類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReduce.class);
//指定mapper 輸出的 key value 類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定 最終輸出的 kv 類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job 輸出的文件目錄
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}

配置啟動類參數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

MapReduce 編程模型 & WordCount 示例


執行我們就用編輯器執行,用本地模式,不提交到hadoop 集群上,執行完成後,去到輸出目錄下可以看到這些文件:

"


MapReduce 編程模型 & WordCount 示例


學習大數據接觸到的第一個編程思想 MapReduce。

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人瞭解到的就是數據量大。當數據量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,裡面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

然後從文件中讀取一行數據,然後對這行數據按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

然後再讀取一行數據重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鐘的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎麼做?還去加載到內存麼?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的集群,然後把這 2T 的文件切分成很多個小文件,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的內存不足的問題。

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

MapReduce 編程模型 & WordCount 示例


簡單說一下上面的思路:

第一步:把兩個T 的文件分成若干個文件塊(block)分散存在整個集群上,比如128M 一個。

第二步:在每臺機器上運行一個map task 任務,分別對自己機器上的文件進行統計:

1.先把數據加載進內存,然後一行一行的對數據進行讀取,按照空格來進行切割。

2.用一個 HashMap 來存儲數據,內容為 <單詞,數量>

3.當自己本地的數據處理完成以後,將數據進行輸出準備

4.輸出數據之前,先把HashMap 按照首字母範圍分成 3 個HashMap5.將3個 HashMap 分別發送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的數據,就會進入同一個 Reduce task 進行處理,保證數據統計的完整性。

第三步: Reduce task 把收到的數據進行彙總,然後輸出到 hdfs 文件系統進程存儲。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分佈式以後,會面臨非常多的複雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的數據任務?

•Map task 和 Reduce task 之間如何進行銜接,什麼時候去啟動Reduce Task 呀?

•如果 Map task 運行失敗了,怎麼處理?

•Map task 還要去維護自己要發送的數據分區,是不是也太麻煩了。

•等等等等等

為什麼要用 MapReduce

可見在程序由單機版擴成分佈式時,會引入大量的複雜工作。為了提高開發效率,可以將分佈式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分佈式程序的通用框架。

WordCount 示例

用一個代碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎麼關聯他們的關係呢。

首先是 map task :

package com.zhouq.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN 默認情況下,是MR 框架中讀取到的一行文本的起始偏移量,long 類型
* 在hadoop 中有自己更精簡的序列化接口,我們不直接用Long ,而是用 LongWritable
* VALUEIN : 默認情況下,是MR 中讀取到的一行文本內容,String ,也有自己的類型 Text 類型
* <p>
* KEYOUT : 是用戶自定義的邏輯處理完成後的自定義輸出數據的key ,我們這裡是單詞,類型為string 同上,Text
* <p>
* VALUEOUT: 是用戶自定義的邏輯處理完成後的自定義輸出value 類型,我們這裡是單詞數量Integer,同上,Integer 也有自己的類型 IntWritable
* <p>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 階段的業務邏輯就寫在map 方法內
* maptask 會對每一行輸入數據 就調用一次我們自定義的map 方法。
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到輸入的這行數據
String line = value.toString();
//根據空格進行分割得到這行的單詞
String[] words = line.split(" ");
//將單詞輸出為 <word,1>
for (String word : words) {
//將單詞作為key ,將次數 做為value輸出,
// 這樣也利於後面的數據分發,可以根據單詞進行分發,
// 以便於相同的單詞落到相同的reduce task 上,方便統計
context.write(new Text(word), new IntWritable(1));
}
}
}

接下來是 reduce task 邏輯:

/**
* KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT
* <p>
* KEYOUT :是自定義 reduce 邏輯處理結果的key
* VALUEOUT : 是自定義reduce 邏輯處理結果的 value
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <zhouq,1>,<zhouq,1>,<zhouq,2> ......
* 入參key 是一組單詞的kv對 的 key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//拿到當前傳送進來的 單詞
// String word = key.toString();
//
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//這裡的key 就是單詞
context.write(key, new IntWritable(count));
}
}

最後是啟動類:

/**
* wc 啟動類
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// mapreduce.framework.name 配置成 local 就是本地運行模式,默認就是local
// 所謂的集群運行模式 yarn ,就是提交程序到yarn 上. 要想集群運行必須指定下面三個配置.
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resoucemanager.hostname", "mini1");
//conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");
Job job = Job.getInstance(conf);
//指定本程序的jar 包 所在的本地路徑
job.setJarByClass(WordCountDriver.class);
//指定本次業務的mepper 和 reduce 業務類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReduce.class);
//指定mapper 輸出的 key value 類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定 最終輸出的 kv 類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job 輸出的文件目錄
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}

配置啟動類參數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

MapReduce 編程模型 & WordCount 示例


執行我們就用編輯器執行,用本地模式,不提交到hadoop 集群上,執行完成後,去到輸出目錄下可以看到這些文件:

MapReduce 編程模型 & WordCount 示例


然後輸出一下 part-r-00000 這個文件:

"


MapReduce 編程模型 & WordCount 示例


學習大數據接觸到的第一個編程思想 MapReduce。

前言

之前在學習大數據的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大數據或者說正在學習大數據的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大數據場景中,最先讓人瞭解到的就是數據量大。當數據量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個文件,就10M,裡面存放的是一篇英文文檔,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 代碼的套路來做,大概就是讀取文件,把數據加載到內存,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

然後從文件中讀取一行數據,然後對這行數據按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設置 value 為 1 。

然後再讀取一行數據重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才文件是 10M,處理完成秒秒鐘的事情,但是現在我的文件是 2T 的大小,看清楚呃,是兩個 T 的文件需要處理,那你現在要怎麼做?還去加載到內存麼?

想想你公司的機器配置,內存多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在內存價格是多少,128G 的內存得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的集群,然後把這 2T 的文件切分成很多個小文件,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的內存不足的問題。

MapReduce 思想

MapReduce 是一種編程模型,用於大規模數據集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(映射)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的數據片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

MapReduce 編程模型 & WordCount 示例


簡單說一下上面的思路:

第一步:把兩個T 的文件分成若干個文件塊(block)分散存在整個集群上,比如128M 一個。

第二步:在每臺機器上運行一個map task 任務,分別對自己機器上的文件進行統計:

1.先把數據加載進內存,然後一行一行的對數據進行讀取,按照空格來進行切割。

2.用一個 HashMap 來存儲數據,內容為 <單詞,數量>

3.當自己本地的數據處理完成以後,將數據進行輸出準備

4.輸出數據之前,先把HashMap 按照首字母範圍分成 3 個HashMap5.將3個 HashMap 分別發送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的數據,就會進入同一個 Reduce task 進行處理,保證數據統計的完整性。

第三步: Reduce task 把收到的數據進行彙總,然後輸出到 hdfs 文件系統進程存儲。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分佈式以後,會面臨非常多的複雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的數據任務?

•Map task 和 Reduce task 之間如何進行銜接,什麼時候去啟動Reduce Task 呀?

•如果 Map task 運行失敗了,怎麼處理?

•Map task 還要去維護自己要發送的數據分區,是不是也太麻煩了。

•等等等等等

為什麼要用 MapReduce

可見在程序由單機版擴成分佈式時,會引入大量的複雜工作。為了提高開發效率,可以將分佈式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分佈式程序的通用框架。

WordCount 示例

用一個代碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎麼關聯他們的關係呢。

首先是 map task :

package com.zhouq.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* KEYIN 默認情況下,是MR 框架中讀取到的一行文本的起始偏移量,long 類型
* 在hadoop 中有自己更精簡的序列化接口,我們不直接用Long ,而是用 LongWritable
* VALUEIN : 默認情況下,是MR 中讀取到的一行文本內容,String ,也有自己的類型 Text 類型
* <p>
* KEYOUT : 是用戶自定義的邏輯處理完成後的自定義輸出數據的key ,我們這裡是單詞,類型為string 同上,Text
* <p>
* VALUEOUT: 是用戶自定義的邏輯處理完成後的自定義輸出value 類型,我們這裡是單詞數量Integer,同上,Integer 也有自己的類型 IntWritable
* <p>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 階段的業務邏輯就寫在map 方法內
* maptask 會對每一行輸入數據 就調用一次我們自定義的map 方法。
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到輸入的這行數據
String line = value.toString();
//根據空格進行分割得到這行的單詞
String[] words = line.split(" ");
//將單詞輸出為 <word,1>
for (String word : words) {
//將單詞作為key ,將次數 做為value輸出,
// 這樣也利於後面的數據分發,可以根據單詞進行分發,
// 以便於相同的單詞落到相同的reduce task 上,方便統計
context.write(new Text(word), new IntWritable(1));
}
}
}

接下來是 reduce task 邏輯:

/**
* KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT
* <p>
* KEYOUT :是自定義 reduce 邏輯處理結果的key
* VALUEOUT : 是自定義reduce 邏輯處理結果的 value
*/
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* <zhouq,1>,<zhouq,1>,<zhouq,2> ......
* 入參key 是一組單詞的kv對 的 key
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//拿到當前傳送進來的 單詞
// String word = key.toString();
//
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//這裡的key 就是單詞
context.write(key, new IntWritable(count));
}
}

最後是啟動類:

/**
* wc 啟動類
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// mapreduce.framework.name 配置成 local 就是本地運行模式,默認就是local
// 所謂的集群運行模式 yarn ,就是提交程序到yarn 上. 要想集群運行必須指定下面三個配置.
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("yarn.resoucemanager.hostname", "mini1");
//conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");
Job job = Job.getInstance(conf);
//指定本程序的jar 包 所在的本地路徑
job.setJarByClass(WordCountDriver.class);
//指定本次業務的mepper 和 reduce 業務類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReduce.class);
//指定mapper 輸出的 key value 類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定 最終輸出的 kv 類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定job 輸出的文件目錄
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
}

配置啟動類參數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

MapReduce 編程模型 & WordCount 示例


執行我們就用編輯器執行,用本地模式,不提交到hadoop 集群上,執行完成後,去到輸出目錄下可以看到這些文件:

MapReduce 編程模型 & WordCount 示例


然後輸出一下 part-r-00000 這個文件:

MapReduce 編程模型 & WordCount 示例


代碼地址:https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java

最後

希望對你有幫助。後面將會去講 MapReduce 是如何去運行的。

原文地址:https://www.cnblogs.com/justdojava/p/11271080.html

鳴謝作者

"

相關推薦

推薦中...