'這是你見過的史上最全的Spark知識學習總結嗎?'

Spark HDFS Hadoop Numbers Hive 大數據 程序員高級碼農 2019-08-14
"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

以下是使用scala實現:

object GroupTop3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
//創建初始RDD
val lines = spark.sparkContext.textFile("D:\\\\score.txt")
//對初始RDD的文本行按空格分割,映射為key-value鍵值對
val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
//對pairs鍵值對按鍵分組
val groupedPairs = pairs.groupByKey()
//獲取分組後每組前3的成績
val top3Score = groupedPairs.map(classScores => {
var className = classScores._1
//獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
Tuple2(className,top3)
})
top3Score.foreach(m => {
println(m._1)
for(s <- m._2) println(s)
println("------------------")
})
}
}
"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

以下是使用scala實現:

object GroupTop3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
//創建初始RDD
val lines = spark.sparkContext.textFile("D:\\\\score.txt")
//對初始RDD的文本行按空格分割,映射為key-value鍵值對
val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
//對pairs鍵值對按鍵分組
val groupedPairs = pairs.groupByKey()
//獲取分組後每組前3的成績
val top3Score = groupedPairs.map(classScores => {
var className = classScores._1
//獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
Tuple2(className,top3)
})
top3Score.foreach(m => {
println(m._1)
for(s <- m._2) println(s)
println("------------------")
})
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”免費來進行獲取~~~~

"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

以下是使用scala實現:

object GroupTop3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
//創建初始RDD
val lines = spark.sparkContext.textFile("D:\\\\score.txt")
//對初始RDD的文本行按空格分割,映射為key-value鍵值對
val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
//對pairs鍵值對按鍵分組
val groupedPairs = pairs.groupByKey()
//獲取分組後每組前3的成績
val top3Score = groupedPairs.map(classScores => {
var className = classScores._1
//獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
Tuple2(className,top3)
})
top3Score.foreach(m => {
println(m._1)
for(s <- m._2) println(s)
println("------------------")
})
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”免費來進行獲取~~~~

這是你見過的史上最全的Spark知識學習總結嗎?

"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

以下是使用scala實現:

object GroupTop3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
//創建初始RDD
val lines = spark.sparkContext.textFile("D:\\\\score.txt")
//對初始RDD的文本行按空格分割,映射為key-value鍵值對
val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
//對pairs鍵值對按鍵分組
val groupedPairs = pairs.groupByKey()
//獲取分組後每組前3的成績
val top3Score = groupedPairs.map(classScores => {
var className = classScores._1
//獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
Tuple2(className,top3)
})
top3Score.foreach(m => {
println(m._1)
for(s <- m._2) println(s)
println("------------------")
})
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”免費來進行獲取~~~~

這是你見過的史上最全的Spark知識學習總結嗎?

這是你見過的史上最全的Spark知識學習總結嗎?

"

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分佈式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分佈在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分佈式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如重新計算一次。

共享變量

Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。

BroadcastVariable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。廣播變量是隻讀的。

val factor = 3
val broadcastVars = sc.broadcast(factor);
val numberList = Array(1,2,3,4,5)
val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)
val numberRDD = sc.parallelize(numberList,1)
val sum = sc.accumulator(0)
numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.打印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
// 將文本分割成單詞RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//將單詞RDD轉換為(單詞,1)鍵值對RDD
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
@Override
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
//對wordPair 進行按鍵計數
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer +integer2;
}
});
// 到這裡為止,就得到了每個單詞出現的次數
// 我們的新需求,是要按照每個單詞出現次數的順序,降序排序
// wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2)
// 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序

// 進行key-value的反轉映射
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
return new Tuple2<Integer, String>(s._2,s._1);
}
});
// 按照key進行排序
JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
// 再次將value-key進行反轉映射
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
return new Tuple2<String, Integer>(s._2,s._1);
}
});
// 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數
// 打印出來
sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times.");
}
});
sc.close();
}
}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 創建lines RDD
JavaRDD<String> lines = sc.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt");
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
sortedWordCount.foreach(s->System.out.println("word \\""+s._1+"\\" appears "+ s._2+" times."));
sc.close();
}
}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.core
import org.apache.spark.sql.SparkSession
object SortWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\Users\\\\Administrator\\\\Desktop\\\\spark.txt")
val words = lines.flatMap{line => line.split(" ")}
val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
val countWord = wordCounts.map{word =>(word._2,word._1)}
val sortedCountWord = countWord.sortByKey(false)
val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
sortedWordCount.foreach(s=>
{
println("word \\""+s._1+ "\\" appears "+s._2+" times.")
})
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered接口和Serializable接口,在key中實現自己對多個列的排序算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey算子按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、打印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
override def compare(that: SecondSortKey): Int = {
if(this.first - that.first !=0){
this.first-that.first
}else{
this.second-that.second
}
}
}
object SecondSort {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
val lines = spark.sparkContext.textFile("D:\\\\sort.txt")
val pairs = lines.map{line => (
new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
)}
val sortedParis = pairs.sortByKey()
val sortedLines = sortedParis.map(pairs => pairs._2)
sortedLines.foreach(s => println(s))
spark.stop()
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.打印輸出

以下是使用scala實現:

object GroupTop3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
//創建初始RDD
val lines = spark.sparkContext.textFile("D:\\\\score.txt")
//對初始RDD的文本行按空格分割,映射為key-value鍵值對
val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
//對pairs鍵值對按鍵分組
val groupedPairs = pairs.groupByKey()
//獲取分組後每組前3的成績
val top3Score = groupedPairs.map(classScores => {
var className = classScores._1
//獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三
var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
Tuple2(className,top3)
})
top3Score.foreach(m => {
println(m._1)
for(s <- m._2) println(s)
println("------------------")
})
}
}
這是你見過的史上最全的Spark知識學習總結嗎?

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”免費來進行獲取~~~~

這是你見過的史上最全的Spark知識學習總結嗎?

這是你見過的史上最全的Spark知識學習總結嗎?

這是你見過的史上最全的Spark知識學習總結嗎?

"

相關推薦

推薦中...