大數據學習之Spark快速入門指南(Quick Start Spark)
大數據學習之Spark快速入門指南(Quick Start Spark)
快速入門指南(Quick Start Spark)
這個文檔只是簡單的介紹如何快速地使用Spark。在下面的介紹中我將介紹如何通過Spark的交互式shell來使用API。
文章目錄
- 1 Basics
- 2 更多關於RDD上面的操作
- 3 Caching
Basics
Spark shell提供一種簡單的方式來學習它的API,同時也提供強大的方式來交互式地分析數據。Spark shell支持Scala和Python。可以通過以下方式進入到Spark shell中。
# 本文原文地址:https://www.iteblog.com/archives/1040.html
# 過往記憶,大量關於Hadoop、Spark等個人原創技術博客
./bin/spark-shell
Spark的一個基本抽象概念就是RDD,RDDs可以通過Hadoop InputFormats或者通過其他的RDDs通過transforming來得到。下面的例子是通過加載SPARK_HOME目錄下的README文件來構建一個新的RDD
scala> val textFile = sc.textFile("file:///spark-bin-0.9.1/README.md")
textFile:org.apache.spark.rdd.RDD[String]=MappedRDD[3]at textFile at <console>:1
RDDs提供actions操作,通過它可以返回值;同時還提供 transformations操作,通過它可以返回一個新的RDD的引用。如下:
scala> textFile.count() // Number of items in this RDD
res1: Long = 108
scala> textFile.first() // First item in this RDD
res2: String = # Apache Spark
我們再試試transformations操作,下面的例子中我們通過使用filter transformation來一個新的RDD:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at
filter at <console>:14
我們將transformations操作和actions操作連起來操作:
scala> textFile.filter(line => line.contains("Spark")).count()
res3: Long = 15
更多關於RDD上面的操作
RDD的transformations操作和actions操作可以用於更復雜的計算。下面的例子是找出README.md文件中單詞數最多的行有多少個單詞
scala> var size = textFile.map(line=>line.split(" ").size)
scala> size.reduce((a, b)=>if (a > b) a else b)
res4: Long = 15
map函數負責將line按照空格分割,並得到這行單詞的數量,而reduce函數將獲取文件中單詞數最多的行有多少個單詞。map和reduce函數的參數是Scala的函數式編程風格。我們可以直接用Java裡面的Math.max()函數,這樣會使得這段代碼更好理解
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b)=>Math.max(a, b))
res10: Int = 15
我們比較熟悉的一種數據流模式是MapReduce,Spark可以很簡單地實現MapReduce流
scala> val wordCounts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] =
MapPartitionsRDD[16] at reduceByKey at <console>:15
在上面的代碼中,我們結合了flatMap,map和reduceByKey等transformations 操作來計算文件中每個單詞的數量,並生成一個(String, Int) pairs形式的RDD。為了計算單詞的數量,我們可以用collect action來實現:
scala> wordCounts.collect()
res11: Array[(String, Int)]=Array(("",120),(submitting,1),(find,1),(versions,4),
((`./bin/pyspark`).,1), (Regression,1), (via,2), (tests,2), (open,2),
(./bin/spark-shell,1), (When,1), (All,1), (download,1), (requires,2),
(SPARK_YARN=true,3), (Testing,1), (take,1), (project,4), (no,1),
(systems.,1), (file,1), (<params>`.,1), (Or,,1), (`<dependencies>`,1),
(About,1), (project's,3), (`<master>`,1), (programs,2),(given.,1),(obtained,1),
(sbt/sbt,5), (artifact,1), (SBT,1), (local[2],1), (not,1), (runs.,1), (you,5),
(building,1), (Along,1), (Lightning-Fast,1), (built,,1), (Hadoop,,1), (use,2),
(MRv2,,1), (it,2), (directory.,1), (overview,1), (2.10.,1),(The,1),(easiest,1),
(Note,1), (guide](http://spark.apache.org/docs/latest/configuration.html),1),
(setup,1), ("org.apache.hadoop",1),...
Caching
Spark可以將數據集存放在集群中的緩存中。這個在數據集經常被訪問的場景下很有用,比如hot數據集的查詢,或者像PageRank這樣的需要迭代很多次的算法。作為一個簡單的列子,下面是將我們自己的linesWithSpark dataset存入到緩存中:
scala> linesWithSpark.cache()
res12: org.apache.spark.rdd.RDD[String] =FilteredRDD[4] at filter at <console>:14
scala> linesWithSpark.count()
res13: Long = 15
scala> linesWithSpark.count()
res14: Long = 15
利用Spark來緩存100行的數據看起來有點傻,但是我們可以通過同樣的函數來存儲非常大的數據集,甚至這些數據集分佈在幾十或者幾百臺節點上。