大數據學習之Spark快速入門指南(Quick Start Spark)

大數據學習之Spark快速入門指南(Quick Start Spark)

大數據學習之Spark快速入門指南(Quick Start Spark)

快速入門指南(Quick Start Spark)


這個文檔只是簡單的介紹如何快速地使用Spark。在下面的介紹中我將介紹如何通過Spark的交互式shell來使用API。

文章目錄

  • 1 Basics
  • 2 更多關於RDD上面的操作
  • 3 Caching

Basics

Spark shell提供一種簡單的方式來學習它的API,同時也提供強大的方式來交互式地分析數據。Spark shell支持Scala和Python。可以通過以下方式進入到Spark shell中。

# 本文原文地址:https://www.iteblog.com/archives/1040.html

# 過往記憶,大量關於Hadoop、Spark等個人原創技術博客

./bin/spark-shell

Spark的一個基本抽象概念就是RDD,RDDs可以通過Hadoop InputFormats或者通過其他的RDDs通過transforming來得到。下面的例子是通過加載SPARK_HOME目錄下的README文件來構建一個新的RDD

scala> val textFile = sc.textFile("file:///spark-bin-0.9.1/README.md")

textFile:org.apache.spark.rdd.RDD[String]=MappedRDD[3]at textFile at <console>:1

RDDs提供actions操作,通過它可以返回值;同時還提供 transformations操作,通過它可以返回一個新的RDD的引用。如下:

scala> textFile.count() // Number of items in this RDD

res1: Long = 108

scala> textFile.first() // First item in this RDD

res2: String = # Apache Spark

我們再試試transformations操作,下面的例子中我們通過使用filter transformation來一個新的RDD:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

linesWithSpark: org.apache.spark.rdd.RDD[String] = FilteredRDD[4] at

filter at <console>:14

我們將transformations操作和actions操作連起來操作:

scala> textFile.filter(line => line.contains("Spark")).count()

res3: Long = 15

更多關於RDD上面的操作

RDD的transformations操作和actions操作可以用於更復雜的計算。下面的例子是找出README.md文件中單詞數最多的行有多少個單詞

scala> var size = textFile.map(line=>line.split(" ").size)

scala> size.reduce((a, b)=>if (a > b) a else b)

res4: Long = 15

map函數負責將line按照空格分割,並得到這行單詞的數量,而reduce函數將獲取文件中單詞數最多的行有多少個單詞。map和reduce函數的參數是Scala的函數式編程風格。我們可以直接用Java裡面的Math.max()函數,這樣會使得這段代碼更好理解

scala> import java.lang.Math

import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b)=>Math.max(a, b))

res10: Int = 15

我們比較熟悉的一種數據流模式是MapReduce,Spark可以很簡單地實現MapReduce流

scala> val wordCounts = textFile.flatMap(line => line.split(" "))

.map(word => (word, 1)).reduceByKey((a, b) => a + b)

wordCounts: org.apache.spark.rdd.RDD[(String, Int)] =

MapPartitionsRDD[16] at reduceByKey at <console>:15

在上面的代碼中,我們結合了flatMap,map和reduceByKey等transformations 操作來計算文件中每個單詞的數量,並生成一個(String, Int) pairs形式的RDD。為了計算單詞的數量,我們可以用collect action來實現:

scala> wordCounts.collect()

res11: Array[(String, Int)]=Array(("",120),(submitting,1),(find,1),(versions,4),

((`./bin/pyspark`).,1), (Regression,1), (via,2), (tests,2), (open,2),

(./bin/spark-shell,1), (When,1), (All,1), (download,1), (requires,2),

(SPARK_YARN=true,3), (Testing,1), (take,1), (project,4), (no,1),

(systems.,1), (file,1), (<params>`.,1), (Or,,1), (`<dependencies>`,1),

(About,1), (project's,3), (`<master>`,1), (programs,2),(given.,1),(obtained,1),

(sbt/sbt,5), (artifact,1), (SBT,1), (local[2],1), (not,1), (runs.,1), (you,5),

(building,1), (Along,1), (Lightning-Fast,1), (built,,1), (Hadoop,,1), (use,2),

(MRv2,,1), (it,2), (directory.,1), (overview,1), (2.10.,1),(The,1),(easiest,1),

(Note,1), (guide](http://spark.apache.org/docs/latest/configuration.html),1),

(setup,1), ("org.apache.hadoop",1),...

Caching

Spark可以將數據集存放在集群中的緩存中。這個在數據集經常被訪問的場景下很有用,比如hot數據集的查詢,或者像PageRank這樣的需要迭代很多次的算法。作為一個簡單的列子,下面是將我們自己的linesWithSpark dataset存入到緩存中:

scala> linesWithSpark.cache()

res12: org.apache.spark.rdd.RDD[String] =FilteredRDD[4] at filter at <console>:14

scala> linesWithSpark.count()

res13: Long = 15

scala> linesWithSpark.count()

res14: Long = 15

利用Spark來緩存100行的數據看起來有點傻,但是我們可以通過同樣的函數來存儲非常大的數據集,甚至這些數據集分佈在幾十或者幾百臺節點上。

大數據學習之Spark快速入門指南(Quick Start Spark)

相關推薦

推薦中...