根據下面原文和網上的譯文結合自己的實際需要整理一下,當做筆記和回查!在給大家分享之前呢,小編推薦一下一個挺不錯的交流寶地,裡面都是一群熱愛並在學習Python的小夥伴們,大幾千了吧,各種各樣的人群都有,特別喜歡看到這種大家一起交流解決難題的氛圍,群資料也上傳了好多,各種大牛解決小白的問題,這個Python群:330637182 歡迎大家進來一起交流討論,一起進步,儘早掌握這門Python語言。
概述
從高層次上來看,每一個Spark應用都包含一個驅動程序,用於執行用戶的main函數以及在集群上運行各種並行操作。 Spark提供的主要抽象是彈性分佈式數據集( RDD ) ,這是一個包含諸多元素、被劃分到不同節點上進行並行處理的數據集合。 RDD通過打開HDFS(或其他hadoop支持的文件系統)上的一個文件、在驅動程序中打開一個已有的Scala集合或由其他 RDD轉換操作得到。 用戶可以要求Spark將RDD持久化到內存中,這樣就可以有效地在並行操作中複用。另外,在節點發生錯誤時RDD可以自動恢復。
Spark提供的 另一個抽象是可以在並行操作中使用的共享變量 。在默認情況下,當Spark將一個函數轉化成許多任務在不同的節點上運行的時候,對於所有在函數中使用的變量,每一個任務都會得到一個副本。有時,某一個變量需要在任務之間或任務與驅動程序之間共享。Spark支持兩種共享變量: 廣播變量 ,用來將一個值緩存到所有節點的內存中; 累加器 ,只能用於累加,比如計數器和求和。
連接Spark
Spark1.3.0只支持Python2.6或更高的版本(但不支持Python3), 但是Spark2.0.0支持 。它使用了標準的CPython解釋器,所以諸如NumPy一類的C庫也是可以使用的。
通過Spark目錄下的 bin/spark-submit
腳本你可以在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫然後讓你將應用提交到集群中。 你可以執行 bin/pyspark 來打開Python的交互命令行。
如果你希望訪問HDFS上的數據,你需要為你使用的HDFS版本建立一個PySpark連接。常見的HDFS版本標籤都已經列在了這個第三方發行版頁面。
最後,你需要將一些Spark的類import到你的程序中。加入如下這行:
from pyspark import SparkContext, SparkConf
想要了解命令行選項的完整信息請執行 pyspark --help命令。在這些場景下,pyspark會觸發一個更通用的spark-submit腳本
並行集合的 一個重要參數是將數據集劃分成分片的數量 。對每一個分片,Spark會在集群中運行一個對應的任務。典型情況下,集群中的每一個CPU將對應運行2-4個分片。一般情況下,Spark會根據當前集群的情況自行設定分片數量。但是, 你也可以通過將第二個參數傳遞給parallelize方法(比如 sc.parallelize(data, 10)
)來手動確定分片數量 。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下兼容性。
外部數據集
PySpark可以通過Hadoop支持的外部數據源(包括本地文件系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈數據集。Spark支持文本文件、 序列文件 以及其他任何 Hadoop輸入格式文件 。
通過文本文件創建RDD要使用SparkContext的textFile方法。這個方法會使用一個文件的URI(或本地文件路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個文件建立一個文本行的集合。以下是一個例子:
distFile = sc.textFile("data.txt")
建立完成後distFile上就可以調用數據集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
在Spark中讀入文件時有幾點要注意:
如果使用了本地文件路徑時,要保證在worker節點上這個文件也能夠通過這個路徑訪問。這點可以通過將這個文件拷貝到所有worker上或者使用網絡掛載的共享文件系統來解決。
包括textFile在內的所有基於文件的Spark讀入方法,都支持將文件夾、壓縮文件、包含通配符的路徑作為參數。比如,以下代碼都是合法的:
可寫類型支持
PySpark序列文件支持利用Java作為中介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然後使用Pyrolite將java結果對象串行化。當將一個鍵值對RDD儲存到一個序列文件中時PySpark將會運行上述過程的相反過程。首先將Python對象反串行化成Java對象,然後轉化成可寫類型。以下可寫類型會自動轉換:
可寫類型 | Python類型 |
---|---|
Text | unicode str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
數組是不能自動轉換的。用戶需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自定義的ArrayWritable子類型轉化成Java的Object[],之後串行化成Python的元組。為了獲得Python的array.array類型來使用主要類型的數組,用戶需要自行指定轉換器。
保存和讀取序列文件
和文本文件類似,序列文件可以通過指定路徑來保存與讀取。鍵值類型都可以自行指定,但是對於標準可寫類型可以不指定。
RDD操作
RDD支持兩類操作: 轉化操作 ,用於從已有的數據集轉化產生新的數據集; 啟動操作 ,用於在計算結束後向驅動程序返回結果。舉個例子,map是一個轉化操作,可以將數據集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,並且向驅動程序返回最終結果(同時還有一個並行的reduceByKey操作可以返回一個分佈數據集)。
在Spark所有的轉化操作都是 惰性求值 的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作對象(比如:一個文件)。只有當一個啟動操作被執行,要向驅動程序返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map操作產生的數據集將會在reduce操作中用到,之後僅僅是返回了reduce的最終的結果而不是map產生的龐大數據集。
在默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來將RDD 持久化 到內存中,這樣Spark就可以在下次使用這個數據集時快速獲得。Spark同樣提供了對將RDD持久化到硬盤上或在多個節點間複製的支持。
比如,傳遞一個無法轉化為 lambda表達式長函數,可以像以下代碼這樣:
轉化操作
轉化操作 | 作用 |
---|---|
map(func) | 返回一個新的分佈數據集,由原數據集元素經func處理後的結果組成 |
filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成 |
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值 |
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是迭代器 |
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器 |
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然後替換或不替換 |
union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的所有元素 |
intersection(otherDataset) | 返回新數據集,是兩個集的交集 |
distinct([numTasks]) | 返回新的集,包括原集中的不重複元素 |
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的數據集 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算 |
sortByKey([ascending], [numTasks]) | 用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定 |
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD |
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD |
cartesian(otherDataset) | 用於T和U類型RDD時返回(T, U)對類型鍵值對RDD |
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片 |
coalesce(numPartitions) | 把RDD的分片數量降低到參數大小 |
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由參數決定 |
repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序 |
下面的表格列出了Spark支持的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)
轉化操作 | 作用 |
---|---|
map(func) | 返回一個新的分佈數據集,由原數據集元素經func處理後的結果組成 |
filter(func) | 返回一個新的數據集,由傳給func返回True的原數據集元素組成 |
flatMap(func) | 與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值 |
mapParitions(func) | 類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是迭代器 |
mapParitionsWithIndex(func) | 類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是迭代器。返回值還是迭代器 |
sample(withReplacement, fraction, seed) | 使用提供的隨機數種子取樣,然後替換或不替換 |
union(otherDataset) | 返回新的數據集,包括原數據集和參數數據集的所有元素 |
intersection(otherDataset) | 返回新數據集,是兩個集的交集 |
distinct([numTasks]) | 返回新的集,包括原集中的不重複元素 |
groupByKey([numTasks]) | 當用於鍵值對RDD時返回(鍵,值迭代器)對的數據集 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算 |
sortByKey([ascending], [numTasks]) | 用於鍵值對RDD時會返回RDD按鍵的順序排序,升降序由第一個參數決定 |
join(otherDataset, [numTasks]) | 用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDD |
cogroup(otherDataset, [numTasks]) | 用於兩個鍵值對RDD時返回(K, (V迭代器, W迭代器))RDD |
cartesian(otherDataset) | 用於T和U類型RDD時返回(T, U)對類型鍵值對RDD |
pipe(command, [envVars]) | 通過shell命令管道處理每個RDD分片 |
coalesce(numPartitions) | 把RDD的分片數量降低到參數大小 |
repartition(numPartitions) | 重新打亂RDD中元素順序並重新分片,數量由參數決定 |
repartitionAndSortWithinPartitions(partitioner) | 按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序 |
啟動操作
下面的表格列出了Spark支持的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。
(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)
啟動操作 | 作用 |
---|---|
reduce(func) | 使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地並行運算 |
collect() | 向驅動程序返回數據集的元素組成的數組 |
count() | 返回數據集元素的數量 |
first() | 返回數據集的第一個元素 |
take(n) | 返回前n個元素組成的數組 |
takeSample(withReplacement, num, [seed]) | 返回一個由原數據集中任意num個元素的suzuki,並且替換之 |
takeOrder(n, [ordering]) | 返回排序後的前n個元素 |
saveAsTextFile(path) | 將數據集的元素寫成文本文件 |
saveAsSequenceFile(path) | 將數據集的元素寫成序列文件,這個API只能用於Java和Scala程序 |
saveAsObjectFile(path) | 將數據集的元素使用Java的序列化特性寫到文件中,這個API只能用於Java和Scala程序 |
countByCount() | 只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數 |
foreach(func) | 對數據集的每個元素執行func, 通常用於完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等 |
RDD持久化
Spark的一個重要功能就是在將數據集 持久化 (或 緩存 )到內存中以便在多個操作中重複使用。當我們持久化一個RDD是,每一個節點將這個RDD的每一個分片計算並保存到內存中以便在下次對這個數據集(或者這個數據集衍生的數據集)的計算中可以複用。這使得接下來的計算過程速度能夠加快(經常能加快超過十倍的速度)。緩存是加快迭代算法和快速交互過程速度的關鍵工具。
刪除數據
Spark會自動監視每個節點的緩存使用同時使用LRU算法丟棄舊數據分片。如果你想手動刪除某個RDD而不是等待它被自動刪除,調用 RDD.unpersist()方法。
共享變量
通常情況下,當一個函數傳遞給一個在遠程集群節點上運行的Spark操作(比如map和reduce)時,Spark會對涉及到的變量的所有副本執行這個函數。這些變量會被複制到每個機器上,而且這個過程不會被反饋給驅動程序。通常情況下,在任務之間讀寫共享變量是很低效的。但是,Spark仍然提供了有限的兩種共享變量類型用於常見的使用場景:廣播變量和累加器。
廣播變量
廣播變量允許程序員在每臺機器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入數據集的拷貝是很低效的。Spark試圖使用高效的廣播算法來分佈廣播變量,以此來降低通信花銷。
其他
集群部署
這個 應用提交指南 描述了一個應用被提交到集群上的過程。簡而言之,只要你把你的應用打成了JAR包(Java/Scala應用)或.py文件的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會將應用提交到任意支持的集群管理器上。
感謝AsuraDong大大的分享!