'你是怎樣進行大數據之Spark性能分析和調優的?'

Spark 大數據 算法 跳槽那些事兒 中央處理器 程序員高級碼農 2019-08-14
"

Spark 的性能分析和調優很有意思,今天再寫一篇。主要話題是 shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了幾種不同場景的性能優化,包括 portal 的性能優化,web service 的性能優化,還有 Spark job 的性能優化。Spark 的性能優化有一些特殊的地方,比如實時性一般不在考慮範圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這麼一個大傢伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以後,能解決其中一大部分問題。比如對於 portal 來說,是頁面靜態化,對於 web service 來說,是高併發(當然,這兩種可以說並不確切,這只是針對我參與的項目總結的經驗而已),而對於Spark來說,這個大 boss 就是 shuffle。

首先要明確什麼是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉換的時候,即 map的 output 向著 reduce 的 input 映射的時候,並非節點一一對應的,即幹 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節點 A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數據要打散然後根據新的路由算法(比如對key進行某種 hash 算法),發送到不同的 reduce 節點上去。(下面這幅圖來自《Spark Architecture: Shuffle》)


"

Spark 的性能分析和調優很有意思,今天再寫一篇。主要話題是 shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了幾種不同場景的性能優化,包括 portal 的性能優化,web service 的性能優化,還有 Spark job 的性能優化。Spark 的性能優化有一些特殊的地方,比如實時性一般不在考慮範圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這麼一個大傢伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以後,能解決其中一大部分問題。比如對於 portal 來說,是頁面靜態化,對於 web service 來說,是高併發(當然,這兩種可以說並不確切,這只是針對我參與的項目總結的經驗而已),而對於Spark來說,這個大 boss 就是 shuffle。

首先要明確什麼是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉換的時候,即 map的 output 向著 reduce 的 input 映射的時候,並非節點一一對應的,即幹 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節點 A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數據要打散然後根據新的路由算法(比如對key進行某種 hash 算法),發送到不同的 reduce 節點上去。(下面這幅圖來自《Spark Architecture: Shuffle》)


你是怎樣進行大數據之Spark性能分析和調優的?


Spark 性能優化和 shuffle 搏鬥


為什麼說 shuffle 是 Spark job 的大 boss,就是因為 Spark 本身的計算通常都是在內存中完成的,比如這樣一個 map 結構的 RDD:(String, Seq),key 是字符串,value 是一個Seq,如果只是對 value 進行一一映射的 map 操作,比如(1)先計算 Seq 的長度,(2)再把這個長度作為元素添加到 Seq 裡面去。這兩步計算,都可以在 local 完成,而事實上也是在內存中操作完成的,換言之,不需要跑到別的 node 上去拿數據,因此執行的速度是非常快的。但是,如果對於一個大的 rdd,shuffle 發生的時候,就會因為網絡傳輸、數據序列化/反序列化產生大量的磁盤 IO 和 CPU 開銷。這個性能上的損失是非常巨大的。

要減少 shuffle 的開銷,主要有兩個思路:

減少 shuffle 次數,儘量不改變 key,把數據處理在 local 完成;

減少 shuffle 的數據規模。

先去重,再合併

比如有 A、B 這樣兩個規模比較大的 RDD,如果各自內部有大量重複,那麼二者一合併,再去重:

1A.union(B).distinct()


這樣的操作固然正確,但是如果可以先各自去重,再合併,再去重,可以大幅度減小shuffle 的開銷(注意 Spark 的默認 union 和 Oracle 裡面的“union all”很像——不去重):

1A.distinct().union(B.distinct()).distinct()


看起來變複雜了對不對,但是當時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。

如果中間結果 rdd 如果被調用多次,可以顯式調用 cache()和 ersist(),以告知 Spark,保留當前 rdd。當然,即便不這麼做,Spark 依然存放不久前計算過的結果(以下來自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.


數據量大,並不一定慢。通常情況下,由於 Spark 的 job 是放到內存裡面進行運算的,因此一個複雜的 map 操作不一定執行起來很慢。但是如果牽涉到 shuffle,這裡面有網絡傳輸和序列化的問題,就有可能非常慢。

類似地,還有 filter 等等操作,目的也是要先對大的 RDD 進行“瘦身”操作,然後在做其他操作。

mapValues 比 map 好

明確 key 不會變的 map,就用 mapValues 來替代,因為這樣可以保證 Spark 不會 shuffle 你的數據:

1A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}


改成:

1A.mapValues{case ((B, C), (D, E)) => (B, C, E)}


用 broadcast + filter 來代替 join

這種優化是一種特定場景的神器,就是拿大的 RDD A 去 join 一個小的 RDD B,比如有這樣兩個 RDD:

A 的結構為(name, age, sex),表示全國人民的 RDD,超大

B的結果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個 RDD 顯然很小,因為人的年齡範圍在 0~200 歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

123A.map{case (name, age, sex) => (age, (name, sex))}.join(B).map{case (age, ((name, sex), title)) => (name, age, sex)}


你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

12val b = sc.broadcast(B.collectAsMap)A.filter{case (name, age, sex) => b.values.contains(age)}


一次 shuffle 都沒有,A 老老實實待著不動,等著全量的 B 被分發過來。

另外,在 Spark SQL 裡面直接有 BroadcastHashJoin,也是把小的 rdd 廣播出去。

不均勻的 shuffle

在工作中遇到這樣一個問題,需要轉換成這樣一個非常巨大的 RDD A,結構是(countryId, product),key 是國家 id,value 是商品的具體信息。當時在 shuffle 的時候,這個 hash 算法是根據 key來選擇節點的,但是事實上這個 countryId 的分佈是極其不均勻的,大部分商品都在美國(countryId=1),於是我們通過 Ganglia 看到,其中一臺 slave 的 CPU 特別高,計算全部聚集到那一臺去了。

找到原因以後,問題解決就容易了,要麼避免這個 shuffle,要麼改進一下 key,讓它的 shuffle 能夠均勻分佈(比如可以拿 countryId+ 商品名稱的 tuple 作 key,甚至生成一個隨機串)。

明確哪些操作必須在 master 完成

如果想打印一些東西到 stdout 裡去:

1A.foreach(println)


想把 RDD 的內容逐條打印出來,但是結果卻沒有出現在 stdout 裡面,因為這一步操作被放到 slave 上面去執行了。其實只需要 collect 一下,這些內容就被加載到 master 的內存中打印了:

1A.collect.foreach(println)


再比如,如果遇到 RDD 操作嵌套的情況,通常考慮優化掉,因為只有master才能去理解和執行 RDD 的操作,slave 只能處理被分配的 task 而已。比如:

1A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}


就可以用 join 來代替:

1A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}


用 reduceByKey 代替 groupByKey

這一條應該是比較經典的了。reduceByKey 會在當前節點(local)中做 reduce 操作,也就是說,會在 shuffle 前,儘可能地減小數據量。而 groupByKey 則不是,它會不做任何處理而直接去 shuffle。當然,有一些場景下,功能上二者並不能互相替換。因為 reduceByKey 要求參與運算的 value,並且和輸出的 value 類型要一樣,但是 groupByKey 則沒有這個要求。

有一些類似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,還有一條類似的是用 treeReduce 來代替 reduce,主要是用於單個 reduce 操作開銷比較大,可以條件 treeReduce 的深度來控制每次 reduce 的規模。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”來進行獲取~~~~

"

Spark 的性能分析和調優很有意思,今天再寫一篇。主要話題是 shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了幾種不同場景的性能優化,包括 portal 的性能優化,web service 的性能優化,還有 Spark job 的性能優化。Spark 的性能優化有一些特殊的地方,比如實時性一般不在考慮範圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這麼一個大傢伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以後,能解決其中一大部分問題。比如對於 portal 來說,是頁面靜態化,對於 web service 來說,是高併發(當然,這兩種可以說並不確切,這只是針對我參與的項目總結的經驗而已),而對於Spark來說,這個大 boss 就是 shuffle。

首先要明確什麼是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉換的時候,即 map的 output 向著 reduce 的 input 映射的時候,並非節點一一對應的,即幹 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節點 A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數據要打散然後根據新的路由算法(比如對key進行某種 hash 算法),發送到不同的 reduce 節點上去。(下面這幅圖來自《Spark Architecture: Shuffle》)


你是怎樣進行大數據之Spark性能分析和調優的?


Spark 性能優化和 shuffle 搏鬥


為什麼說 shuffle 是 Spark job 的大 boss,就是因為 Spark 本身的計算通常都是在內存中完成的,比如這樣一個 map 結構的 RDD:(String, Seq),key 是字符串,value 是一個Seq,如果只是對 value 進行一一映射的 map 操作,比如(1)先計算 Seq 的長度,(2)再把這個長度作為元素添加到 Seq 裡面去。這兩步計算,都可以在 local 完成,而事實上也是在內存中操作完成的,換言之,不需要跑到別的 node 上去拿數據,因此執行的速度是非常快的。但是,如果對於一個大的 rdd,shuffle 發生的時候,就會因為網絡傳輸、數據序列化/反序列化產生大量的磁盤 IO 和 CPU 開銷。這個性能上的損失是非常巨大的。

要減少 shuffle 的開銷,主要有兩個思路:

減少 shuffle 次數,儘量不改變 key,把數據處理在 local 完成;

減少 shuffle 的數據規模。

先去重,再合併

比如有 A、B 這樣兩個規模比較大的 RDD,如果各自內部有大量重複,那麼二者一合併,再去重:

1A.union(B).distinct()


這樣的操作固然正確,但是如果可以先各自去重,再合併,再去重,可以大幅度減小shuffle 的開銷(注意 Spark 的默認 union 和 Oracle 裡面的“union all”很像——不去重):

1A.distinct().union(B.distinct()).distinct()


看起來變複雜了對不對,但是當時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。

如果中間結果 rdd 如果被調用多次,可以顯式調用 cache()和 ersist(),以告知 Spark,保留當前 rdd。當然,即便不這麼做,Spark 依然存放不久前計算過的結果(以下來自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.


數據量大,並不一定慢。通常情況下,由於 Spark 的 job 是放到內存裡面進行運算的,因此一個複雜的 map 操作不一定執行起來很慢。但是如果牽涉到 shuffle,這裡面有網絡傳輸和序列化的問題,就有可能非常慢。

類似地,還有 filter 等等操作,目的也是要先對大的 RDD 進行“瘦身”操作,然後在做其他操作。

mapValues 比 map 好

明確 key 不會變的 map,就用 mapValues 來替代,因為這樣可以保證 Spark 不會 shuffle 你的數據:

1A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}


改成:

1A.mapValues{case ((B, C), (D, E)) => (B, C, E)}


用 broadcast + filter 來代替 join

這種優化是一種特定場景的神器,就是拿大的 RDD A 去 join 一個小的 RDD B,比如有這樣兩個 RDD:

A 的結構為(name, age, sex),表示全國人民的 RDD,超大

B的結果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個 RDD 顯然很小,因為人的年齡範圍在 0~200 歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

123A.map{case (name, age, sex) => (age, (name, sex))}.join(B).map{case (age, ((name, sex), title)) => (name, age, sex)}


你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

12val b = sc.broadcast(B.collectAsMap)A.filter{case (name, age, sex) => b.values.contains(age)}


一次 shuffle 都沒有,A 老老實實待著不動,等著全量的 B 被分發過來。

另外,在 Spark SQL 裡面直接有 BroadcastHashJoin,也是把小的 rdd 廣播出去。

不均勻的 shuffle

在工作中遇到這樣一個問題,需要轉換成這樣一個非常巨大的 RDD A,結構是(countryId, product),key 是國家 id,value 是商品的具體信息。當時在 shuffle 的時候,這個 hash 算法是根據 key來選擇節點的,但是事實上這個 countryId 的分佈是極其不均勻的,大部分商品都在美國(countryId=1),於是我們通過 Ganglia 看到,其中一臺 slave 的 CPU 特別高,計算全部聚集到那一臺去了。

找到原因以後,問題解決就容易了,要麼避免這個 shuffle,要麼改進一下 key,讓它的 shuffle 能夠均勻分佈(比如可以拿 countryId+ 商品名稱的 tuple 作 key,甚至生成一個隨機串)。

明確哪些操作必須在 master 完成

如果想打印一些東西到 stdout 裡去:

1A.foreach(println)


想把 RDD 的內容逐條打印出來,但是結果卻沒有出現在 stdout 裡面,因為這一步操作被放到 slave 上面去執行了。其實只需要 collect 一下,這些內容就被加載到 master 的內存中打印了:

1A.collect.foreach(println)


再比如,如果遇到 RDD 操作嵌套的情況,通常考慮優化掉,因為只有master才能去理解和執行 RDD 的操作,slave 只能處理被分配的 task 而已。比如:

1A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}


就可以用 join 來代替:

1A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}


用 reduceByKey 代替 groupByKey

這一條應該是比較經典的了。reduceByKey 會在當前節點(local)中做 reduce 操作,也就是說,會在 shuffle 前,儘可能地減小數據量。而 groupByKey 則不是,它會不做任何處理而直接去 shuffle。當然,有一些場景下,功能上二者並不能互相替換。因為 reduceByKey 要求參與運算的 value,並且和輸出的 value 類型要一樣,但是 groupByKey 則沒有這個要求。

有一些類似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,還有一條類似的是用 treeReduce 來代替 reduce,主要是用於單個 reduce 操作開銷比較大,可以條件 treeReduce 的深度來控制每次 reduce 的規模。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”來進行獲取~~~~

你是怎樣進行大數據之Spark性能分析和調優的?

"

Spark 的性能分析和調優很有意思,今天再寫一篇。主要話題是 shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了幾種不同場景的性能優化,包括 portal 的性能優化,web service 的性能優化,還有 Spark job 的性能優化。Spark 的性能優化有一些特殊的地方,比如實時性一般不在考慮範圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這麼一個大傢伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以後,能解決其中一大部分問題。比如對於 portal 來說,是頁面靜態化,對於 web service 來說,是高併發(當然,這兩種可以說並不確切,這只是針對我參與的項目總結的經驗而已),而對於Spark來說,這個大 boss 就是 shuffle。

首先要明確什麼是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉換的時候,即 map的 output 向著 reduce 的 input 映射的時候,並非節點一一對應的,即幹 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節點 A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數據要打散然後根據新的路由算法(比如對key進行某種 hash 算法),發送到不同的 reduce 節點上去。(下面這幅圖來自《Spark Architecture: Shuffle》)


你是怎樣進行大數據之Spark性能分析和調優的?


Spark 性能優化和 shuffle 搏鬥


為什麼說 shuffle 是 Spark job 的大 boss,就是因為 Spark 本身的計算通常都是在內存中完成的,比如這樣一個 map 結構的 RDD:(String, Seq),key 是字符串,value 是一個Seq,如果只是對 value 進行一一映射的 map 操作,比如(1)先計算 Seq 的長度,(2)再把這個長度作為元素添加到 Seq 裡面去。這兩步計算,都可以在 local 完成,而事實上也是在內存中操作完成的,換言之,不需要跑到別的 node 上去拿數據,因此執行的速度是非常快的。但是,如果對於一個大的 rdd,shuffle 發生的時候,就會因為網絡傳輸、數據序列化/反序列化產生大量的磁盤 IO 和 CPU 開銷。這個性能上的損失是非常巨大的。

要減少 shuffle 的開銷,主要有兩個思路:

減少 shuffle 次數,儘量不改變 key,把數據處理在 local 完成;

減少 shuffle 的數據規模。

先去重,再合併

比如有 A、B 這樣兩個規模比較大的 RDD,如果各自內部有大量重複,那麼二者一合併,再去重:

1A.union(B).distinct()


這樣的操作固然正確,但是如果可以先各自去重,再合併,再去重,可以大幅度減小shuffle 的開銷(注意 Spark 的默認 union 和 Oracle 裡面的“union all”很像——不去重):

1A.distinct().union(B.distinct()).distinct()


看起來變複雜了對不對,但是當時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。

如果中間結果 rdd 如果被調用多次,可以顯式調用 cache()和 ersist(),以告知 Spark,保留當前 rdd。當然,即便不這麼做,Spark 依然存放不久前計算過的結果(以下來自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.


數據量大,並不一定慢。通常情況下,由於 Spark 的 job 是放到內存裡面進行運算的,因此一個複雜的 map 操作不一定執行起來很慢。但是如果牽涉到 shuffle,這裡面有網絡傳輸和序列化的問題,就有可能非常慢。

類似地,還有 filter 等等操作,目的也是要先對大的 RDD 進行“瘦身”操作,然後在做其他操作。

mapValues 比 map 好

明確 key 不會變的 map,就用 mapValues 來替代,因為這樣可以保證 Spark 不會 shuffle 你的數據:

1A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}


改成:

1A.mapValues{case ((B, C), (D, E)) => (B, C, E)}


用 broadcast + filter 來代替 join

這種優化是一種特定場景的神器,就是拿大的 RDD A 去 join 一個小的 RDD B,比如有這樣兩個 RDD:

A 的結構為(name, age, sex),表示全國人民的 RDD,超大

B的結果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個 RDD 顯然很小,因為人的年齡範圍在 0~200 歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

123A.map{case (name, age, sex) => (age, (name, sex))}.join(B).map{case (age, ((name, sex), title)) => (name, age, sex)}


你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

12val b = sc.broadcast(B.collectAsMap)A.filter{case (name, age, sex) => b.values.contains(age)}


一次 shuffle 都沒有,A 老老實實待著不動,等著全量的 B 被分發過來。

另外,在 Spark SQL 裡面直接有 BroadcastHashJoin,也是把小的 rdd 廣播出去。

不均勻的 shuffle

在工作中遇到這樣一個問題,需要轉換成這樣一個非常巨大的 RDD A,結構是(countryId, product),key 是國家 id,value 是商品的具體信息。當時在 shuffle 的時候,這個 hash 算法是根據 key來選擇節點的,但是事實上這個 countryId 的分佈是極其不均勻的,大部分商品都在美國(countryId=1),於是我們通過 Ganglia 看到,其中一臺 slave 的 CPU 特別高,計算全部聚集到那一臺去了。

找到原因以後,問題解決就容易了,要麼避免這個 shuffle,要麼改進一下 key,讓它的 shuffle 能夠均勻分佈(比如可以拿 countryId+ 商品名稱的 tuple 作 key,甚至生成一個隨機串)。

明確哪些操作必須在 master 完成

如果想打印一些東西到 stdout 裡去:

1A.foreach(println)


想把 RDD 的內容逐條打印出來,但是結果卻沒有出現在 stdout 裡面,因為這一步操作被放到 slave 上面去執行了。其實只需要 collect 一下,這些內容就被加載到 master 的內存中打印了:

1A.collect.foreach(println)


再比如,如果遇到 RDD 操作嵌套的情況,通常考慮優化掉,因為只有master才能去理解和執行 RDD 的操作,slave 只能處理被分配的 task 而已。比如:

1A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}


就可以用 join 來代替:

1A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}


用 reduceByKey 代替 groupByKey

這一條應該是比較經典的了。reduceByKey 會在當前節點(local)中做 reduce 操作,也就是說,會在 shuffle 前,儘可能地減小數據量。而 groupByKey 則不是,它會不做任何處理而直接去 shuffle。當然,有一些場景下,功能上二者並不能互相替換。因為 reduceByKey 要求參與運算的 value,並且和輸出的 value 類型要一樣,但是 groupByKey 則沒有這個要求。

有一些類似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,還有一條類似的是用 treeReduce 來代替 reduce,主要是用於單個 reduce 操作開銷比較大,可以條件 treeReduce 的深度來控制每次 reduce 的規模。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”來進行獲取~~~~

你是怎樣進行大數據之Spark性能分析和調優的?

你是怎樣進行大數據之Spark性能分析和調優的?

"

Spark 的性能分析和調優很有意思,今天再寫一篇。主要話題是 shuffle,當然也牽涉一些其他代碼上的小把戲。

以前寫過一篇文章,比較了幾種不同場景的性能優化,包括 portal 的性能優化,web service 的性能優化,還有 Spark job 的性能優化。Spark 的性能優化有一些特殊的地方,比如實時性一般不在考慮範圍之內,通常我們用Spark來處理的數據,都是要求異步得到結果的數據;再比如數據量一般都很大,要不然也沒有必要在集群上操縱這麼一個大傢伙,等等。事實上,我們都知道沒有銀彈,但是每一種性能優化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以後,能解決其中一大部分問題。比如對於 portal 來說,是頁面靜態化,對於 web service 來說,是高併發(當然,這兩種可以說並不確切,這只是針對我參與的項目總結的經驗而已),而對於Spark來說,這個大 boss 就是 shuffle。

首先要明確什麼是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉換的時候,即 map的 output 向著 reduce 的 input 映射的時候,並非節點一一對應的,即幹 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節點 A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數據要打散然後根據新的路由算法(比如對key進行某種 hash 算法),發送到不同的 reduce 節點上去。(下面這幅圖來自《Spark Architecture: Shuffle》)


你是怎樣進行大數據之Spark性能分析和調優的?


Spark 性能優化和 shuffle 搏鬥


為什麼說 shuffle 是 Spark job 的大 boss,就是因為 Spark 本身的計算通常都是在內存中完成的,比如這樣一個 map 結構的 RDD:(String, Seq),key 是字符串,value 是一個Seq,如果只是對 value 進行一一映射的 map 操作,比如(1)先計算 Seq 的長度,(2)再把這個長度作為元素添加到 Seq 裡面去。這兩步計算,都可以在 local 完成,而事實上也是在內存中操作完成的,換言之,不需要跑到別的 node 上去拿數據,因此執行的速度是非常快的。但是,如果對於一個大的 rdd,shuffle 發生的時候,就會因為網絡傳輸、數據序列化/反序列化產生大量的磁盤 IO 和 CPU 開銷。這個性能上的損失是非常巨大的。

要減少 shuffle 的開銷,主要有兩個思路:

減少 shuffle 次數,儘量不改變 key,把數據處理在 local 完成;

減少 shuffle 的數據規模。

先去重,再合併

比如有 A、B 這樣兩個規模比較大的 RDD,如果各自內部有大量重複,那麼二者一合併,再去重:

1A.union(B).distinct()


這樣的操作固然正確,但是如果可以先各自去重,再合併,再去重,可以大幅度減小shuffle 的開銷(注意 Spark 的默認 union 和 Oracle 裡面的“union all”很像——不去重):

1A.distinct().union(B.distinct()).distinct()


看起來變複雜了對不對,但是當時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。

如果中間結果 rdd 如果被調用多次,可以顯式調用 cache()和 ersist(),以告知 Spark,保留當前 rdd。當然,即便不這麼做,Spark 依然存放不久前計算過的結果(以下來自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.


數據量大,並不一定慢。通常情況下,由於 Spark 的 job 是放到內存裡面進行運算的,因此一個複雜的 map 操作不一定執行起來很慢。但是如果牽涉到 shuffle,這裡面有網絡傳輸和序列化的問題,就有可能非常慢。

類似地,還有 filter 等等操作,目的也是要先對大的 RDD 進行“瘦身”操作,然後在做其他操作。

mapValues 比 map 好

明確 key 不會變的 map,就用 mapValues 來替代,因為這樣可以保證 Spark 不會 shuffle 你的數據:

1A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}


改成:

1A.mapValues{case ((B, C), (D, E)) => (B, C, E)}


用 broadcast + filter 來代替 join

這種優化是一種特定場景的神器,就是拿大的 RDD A 去 join 一個小的 RDD B,比如有這樣兩個 RDD:

A 的結構為(name, age, sex),表示全國人民的 RDD,超大

B的結果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個 RDD 顯然很小,因為人的年齡範圍在 0~200 歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

123A.map{case (name, age, sex) => (age, (name, sex))}.join(B).map{case (age, ((name, sex), title)) => (name, age, sex)}


你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

12val b = sc.broadcast(B.collectAsMap)A.filter{case (name, age, sex) => b.values.contains(age)}


一次 shuffle 都沒有,A 老老實實待著不動,等著全量的 B 被分發過來。

另外,在 Spark SQL 裡面直接有 BroadcastHashJoin,也是把小的 rdd 廣播出去。

不均勻的 shuffle

在工作中遇到這樣一個問題,需要轉換成這樣一個非常巨大的 RDD A,結構是(countryId, product),key 是國家 id,value 是商品的具體信息。當時在 shuffle 的時候,這個 hash 算法是根據 key來選擇節點的,但是事實上這個 countryId 的分佈是極其不均勻的,大部分商品都在美國(countryId=1),於是我們通過 Ganglia 看到,其中一臺 slave 的 CPU 特別高,計算全部聚集到那一臺去了。

找到原因以後,問題解決就容易了,要麼避免這個 shuffle,要麼改進一下 key,讓它的 shuffle 能夠均勻分佈(比如可以拿 countryId+ 商品名稱的 tuple 作 key,甚至生成一個隨機串)。

明確哪些操作必須在 master 完成

如果想打印一些東西到 stdout 裡去:

1A.foreach(println)


想把 RDD 的內容逐條打印出來,但是結果卻沒有出現在 stdout 裡面,因為這一步操作被放到 slave 上面去執行了。其實只需要 collect 一下,這些內容就被加載到 master 的內存中打印了:

1A.collect.foreach(println)


再比如,如果遇到 RDD 操作嵌套的情況,通常考慮優化掉,因為只有master才能去理解和執行 RDD 的操作,slave 只能處理被分配的 task 而已。比如:

1A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}


就可以用 join 來代替:

1A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}


用 reduceByKey 代替 groupByKey

這一條應該是比較經典的了。reduceByKey 會在當前節點(local)中做 reduce 操作,也就是說,會在 shuffle 前,儘可能地減小數據量。而 groupByKey 則不是,它會不做任何處理而直接去 shuffle。當然,有一些場景下,功能上二者並不能互相替換。因為 reduceByKey 要求參與運算的 value,並且和輸出的 value 類型要一樣,但是 groupByKey 則沒有這個要求。

有一些類似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,還有一條類似的是用 treeReduce 來代替 reduce,主要是用於單個 reduce 操作開銷比較大,可以條件 treeReduce 的深度來控制每次 reduce 的規模。

大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。

小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,

轉發關注小編,直接私信小編“學習”來進行獲取~~~~

你是怎樣進行大數據之Spark性能分析和調優的?

你是怎樣進行大數據之Spark性能分析和調優的?

你是怎樣進行大數據之Spark性能分析和調優的?

"

相關推薦

推薦中...