Spark 的學習使用

Spark 科技 中國智慧城市 2017-06-25

最近一直在做數據統計相關的工作,主要是用 Spark 分析一些關鍵詞在微信中的傳播行為。這篇博客記錄一下學習的過程。

資源參數調優

使用公司的 Spark 平臺計算時,num_executors 設置的 1,導致每個任務需要跑 8 個多小時。num_executors=10 時僅需要 1 小時,num_executors=20 時需要 30 分鐘。繼續調整 num_executors=100 executor_cores=4 後需要 10 分鐘。這是由於參數設置不當導致資源沒有充分使用,跑任務任務浪費大量時間。

進行調優需要先大致瞭解 Spark 的基本運行原理:

Spark 的學習使用

num_executors 是執行器的個數,executor_cores 是執行器中 CPU 的個數。經過實踐,設置為下列參數比較合適。

num_executors :每個 Spark 作業運行一般設置 50-100 個左右的 executor 進程比較合適

exucutor-memory:每個 executor 進程設置 4G-8G

executor_cores:決定每個 Executor 進程並行執行 task 線程的能力

driver-memory:1G 左右

代碼調優

1、對多次使用的 RDD 進行持久化

var rdd = sc.textFile(“file:///d:/one_day_index.txt”).cache

cache 方法使用非序列化的的方式將 RDD 中的數據全部持久化到內存中

2、避免使用 Shuffer 類算子

Shuffer 是把分佈在多個節點上的同一個 Key 的數據拉取到同一個節點上,進行聚合或 Join 等操作。如 ReduceByKey 或 Join 等操作。

3、使用 Kryo 優化序列化性能

Kryo 比 Java 自帶的序列化庫性能要高 10 倍左右。

理解 flatMap

一篇文章分詞後有 [文章編號,詞 1,詞 2,詞 3…] 這樣的數據,需要生成 [文章編號,詞 1] [文章編號,詞 2] [文章編號,詞 3] ,以便後面對相同的詞進行 Reduce,變為 [詞 1,文章編號 1,文章編號 2…]。相當於倒排一次。

統計函數 processLineToPair 輸入為 [三元組,詞 1,詞 2,詞 3…],輸出為 Array[(String, DocObj)],擔心的是這樣返回在 RDD 裡數據只有一行,但經過 flapMap 後就變為了 RDD[(String, DocObj)],變為了多行保存在 RDD 中。

flatMap 的返回是一對多或一對零,而 Map 是一對一。

理解 reduceByKey

reduceByKey 的輸入和輸出是一樣的,經過 flapMap 後返回的 RDD 是有 Key:Value 的概念,所以可以默認去 ByKey 進行 Reduce。reduceByKey 的對象只能是 PairRDD。當需要把一個普通的 RDD 轉為 PairRDD 時間,可以調用 map 函數來實現,傳遞進 map 的函數需要返回鍵值對或者二元元組,二元元組會隱式轉換為 PairRDD。

base64 編解碼

base64 是把 3 個 8-bit 字節轉換為 4 個 6-bit 字節的編碼方式,Scala 中使用下面的方式

import org.apache.commons.codec.binary.Base64

Base64.decodeBase64(encode_uin)

Option 的使用

Option 可以包在返回值外面,相當於多了一個異常碼。返回為 none 則異常,返回為 Some 則正常。一般和 flapMap 結合使用。

SimpleModPartitioner 對結果進行分區

對數據進行 reduce 的時候可以指定保存結果的分區數,可以節省一步的 Shuffle。

rdd_hash_index.reduceByKey(new SimpleModPartitioner(2000), (v1, v2)

=> reduceMergeWord(v1, v2))

注意 rdd_hash_index 的 KEY 必須是數字才可以正確地 HASH

任務重跑時刪除文件夾

1、設置為直接覆蓋文件路徑,spark.hadoop.validateOutputSpecs 這種可能會導致以前的文件刪除不完全。

2、通過 spark 自帶的 hadoopconf 方式刪除

相關推薦

推薦中...