'分佈式機器學習之——Spark MLlib並行訓練原理'

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

分佈式機器學習之——Spark MLlib並行訓練原理

圖2 DAG,Spark任務的有向無環圖

在Spark平臺上並行處理圖2的DAG時,最關鍵的過程是找到哪些是可以並行處理的部分,哪些是必須shuffle和reduce的部分。

這裡的shuffle指的是所有partition(數據分片)的數據必須進行洗牌後才能得到下一步的數據,最典型的操作就是圖2中的groupByKey和join操作。拿join操作來說,必須通過在textFile數據中和hadoopFile數據中做全量的匹配才可以得到join後的dataframe。而groupby操作需要對數據中所有相同的key進行合併,也需要全局的shuffle才能夠完成。

與之相比,map,filter等操作僅需要逐條的進行數據處理和轉換就可以,不需要進行數據間的操作,因此各partition之間可以並行處理。

除此之外,在得到最終的計算結果之前,程序需要進行reduce的操作,從各partition上彙總統計結果,隨著partition的數量逐漸減小,reduce操作的並行程度逐漸降低,直到將最終的計算結果彙總到master節點上。

所以可以說shuffle和reduce操作的發生決定了純並行處理階段的邊界。如圖3所示,Spark的DAG被分割成了不同的並行處理階段(stage)。

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

分佈式機器學習之——Spark MLlib並行訓練原理

圖2 DAG,Spark任務的有向無環圖

在Spark平臺上並行處理圖2的DAG時,最關鍵的過程是找到哪些是可以並行處理的部分,哪些是必須shuffle和reduce的部分。

這裡的shuffle指的是所有partition(數據分片)的數據必須進行洗牌後才能得到下一步的數據,最典型的操作就是圖2中的groupByKey和join操作。拿join操作來說,必須通過在textFile數據中和hadoopFile數據中做全量的匹配才可以得到join後的dataframe。而groupby操作需要對數據中所有相同的key進行合併,也需要全局的shuffle才能夠完成。

與之相比,map,filter等操作僅需要逐條的進行數據處理和轉換就可以,不需要進行數據間的操作,因此各partition之間可以並行處理。

除此之外,在得到最終的計算結果之前,程序需要進行reduce的操作,從各partition上彙總統計結果,隨著partition的數量逐漸減小,reduce操作的並行程度逐漸降低,直到將最終的計算結果彙總到master節點上。

所以可以說shuffle和reduce操作的發生決定了純並行處理階段的邊界。如圖3所示,Spark的DAG被分割成了不同的並行處理階段(stage)。

分佈式機器學習之——Spark MLlib並行訓練原理

圖3 被shuffle操作分割的DAG stages

需要強調的是shuffle操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源,因此shuffle操作是spark程序應該儘量避免的。一句話總結Spark的計算過程就是:Stage內部數據高效並行計算,Stage邊界處進行消耗資源的shuffle操作或者最終的reduce操作。

Spark MLlib並行訓練原理

有了Spark分佈式計算過程的基礎,下面就可以更清楚的理解Spark MLlib並行訓練的原理。

在所有主流的機器學習模型中,Random Forest的模型結構特點決定了其可以完全進行數據並行的模型訓練,而GBDT的結構特點則決定了樹之間只能進行串行的訓練,這裡就不再贅述其spark的實現方式,我們將重點放在梯度下降類方法的實現上,因為梯度下降的並行程度實現質量直接決定了以Logistic Regression為基礎,以Multiple Layer Perceptron為代表的深度學習模型的訓練速度。

這裡,我們深入到Spark MLlib的源碼中,直接把Spark做mini Batch梯度下降的源碼貼出如下(代碼摘自Spark 2.4.3 GradientDescent 類的 runMiniBatchSGD 函數):

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

分佈式機器學習之——Spark MLlib並行訓練原理

圖2 DAG,Spark任務的有向無環圖

在Spark平臺上並行處理圖2的DAG時,最關鍵的過程是找到哪些是可以並行處理的部分,哪些是必須shuffle和reduce的部分。

這裡的shuffle指的是所有partition(數據分片)的數據必須進行洗牌後才能得到下一步的數據,最典型的操作就是圖2中的groupByKey和join操作。拿join操作來說,必須通過在textFile數據中和hadoopFile數據中做全量的匹配才可以得到join後的dataframe。而groupby操作需要對數據中所有相同的key進行合併,也需要全局的shuffle才能夠完成。

與之相比,map,filter等操作僅需要逐條的進行數據處理和轉換就可以,不需要進行數據間的操作,因此各partition之間可以並行處理。

除此之外,在得到最終的計算結果之前,程序需要進行reduce的操作,從各partition上彙總統計結果,隨著partition的數量逐漸減小,reduce操作的並行程度逐漸降低,直到將最終的計算結果彙總到master節點上。

所以可以說shuffle和reduce操作的發生決定了純並行處理階段的邊界。如圖3所示,Spark的DAG被分割成了不同的並行處理階段(stage)。

分佈式機器學習之——Spark MLlib並行訓練原理

圖3 被shuffle操作分割的DAG stages

需要強調的是shuffle操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源,因此shuffle操作是spark程序應該儘量避免的。一句話總結Spark的計算過程就是:Stage內部數據高效並行計算,Stage邊界處進行消耗資源的shuffle操作或者最終的reduce操作。

Spark MLlib並行訓練原理

有了Spark分佈式計算過程的基礎,下面就可以更清楚的理解Spark MLlib並行訓練的原理。

在所有主流的機器學習模型中,Random Forest的模型結構特點決定了其可以完全進行數據並行的模型訓練,而GBDT的結構特點則決定了樹之間只能進行串行的訓練,這裡就不再贅述其spark的實現方式,我們將重點放在梯度下降類方法的實現上,因為梯度下降的並行程度實現質量直接決定了以Logistic Regression為基礎,以Multiple Layer Perceptron為代表的深度學習模型的訓練速度。

這裡,我們深入到Spark MLlib的源碼中,直接把Spark做mini Batch梯度下降的源碼貼出如下(代碼摘自Spark 2.4.3 GradientDescent 類的 runMiniBatchSGD 函數):

分佈式機器學習之——Spark MLlib並行訓練原理

乍一看比較複雜,這裡可以為大家做一個精簡,只列出關鍵的操作部分,大家就可以一目瞭然Spark在做什麼。

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

分佈式機器學習之——Spark MLlib並行訓練原理

圖2 DAG,Spark任務的有向無環圖

在Spark平臺上並行處理圖2的DAG時,最關鍵的過程是找到哪些是可以並行處理的部分,哪些是必須shuffle和reduce的部分。

這裡的shuffle指的是所有partition(數據分片)的數據必須進行洗牌後才能得到下一步的數據,最典型的操作就是圖2中的groupByKey和join操作。拿join操作來說,必須通過在textFile數據中和hadoopFile數據中做全量的匹配才可以得到join後的dataframe。而groupby操作需要對數據中所有相同的key進行合併,也需要全局的shuffle才能夠完成。

與之相比,map,filter等操作僅需要逐條的進行數據處理和轉換就可以,不需要進行數據間的操作,因此各partition之間可以並行處理。

除此之外,在得到最終的計算結果之前,程序需要進行reduce的操作,從各partition上彙總統計結果,隨著partition的數量逐漸減小,reduce操作的並行程度逐漸降低,直到將最終的計算結果彙總到master節點上。

所以可以說shuffle和reduce操作的發生決定了純並行處理階段的邊界。如圖3所示,Spark的DAG被分割成了不同的並行處理階段(stage)。

分佈式機器學習之——Spark MLlib並行訓練原理

圖3 被shuffle操作分割的DAG stages

需要強調的是shuffle操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源,因此shuffle操作是spark程序應該儘量避免的。一句話總結Spark的計算過程就是:Stage內部數據高效並行計算,Stage邊界處進行消耗資源的shuffle操作或者最終的reduce操作。

Spark MLlib並行訓練原理

有了Spark分佈式計算過程的基礎,下面就可以更清楚的理解Spark MLlib並行訓練的原理。

在所有主流的機器學習模型中,Random Forest的模型結構特點決定了其可以完全進行數據並行的模型訓練,而GBDT的結構特點則決定了樹之間只能進行串行的訓練,這裡就不再贅述其spark的實現方式,我們將重點放在梯度下降類方法的實現上,因為梯度下降的並行程度實現質量直接決定了以Logistic Regression為基礎,以Multiple Layer Perceptron為代表的深度學習模型的訓練速度。

這裡,我們深入到Spark MLlib的源碼中,直接把Spark做mini Batch梯度下降的源碼貼出如下(代碼摘自Spark 2.4.3 GradientDescent 類的 runMiniBatchSGD 函數):

分佈式機器學習之——Spark MLlib並行訓練原理

乍一看比較複雜,這裡可以為大家做一個精簡,只列出關鍵的操作部分,大家就可以一目瞭然Spark在做什麼。

分佈式機器學習之——Spark MLlib並行訓練原理

經過精簡的代碼非常簡單,Spark的mini batch過程製作了三件事:

1.把當前的模型參數廣播到各個數據partition(可當作虛擬的計算節點)

2.各計算節點進行數據抽樣得到mini batch的數據,分別計算梯度,再通過treeAggregate操作彙總梯度,得到最終梯度gradientSum

3.利用gradientSum更新模型權重

這樣一來,每次迭代的Stage和Stage的邊界就非常清楚了,Stage內部的並行部分是各節點分別採樣並計算梯度的過程,Stage的邊界是彙總加和各節點梯度的過程。這裡再強調一下彙總梯度的操作treeAggregate,該操作是進行類似樹結構的逐層彙總,整個操作流程如圖4所示。

"

這裡是 王喆的機器學習筆記 的第二十五篇文章。接下來的幾篇文章希望與大家一同討論一下機器學習模型的分佈式訓練的問題。這個問題在推薦、廣告、搜索領域尤為突出,因為在互聯網場景下,動輒TB甚至PB級的數據量,幾乎不可能利用單點完成機器學習模型的訓練,分佈式機器學習訓練成為唯一的選擇。

在筆者看來,分佈式機器學習訓練有三個主要的方案,分別是Spark MLlib,Parameter ServerTensorFlow,倒不是說他們是唯三可供選擇的平臺,而是因為他們分別代表著三種主流的解決分佈式訓練方法。今天我們先從Spark MLlib說起,看看最流行的大數據計算平臺是如何處理機器學習模型的並行訓練問題的。

說起Spark,我想不會有任何算法工程師是陌生的。作為流行了至少五年的大數據項目,雖然受到了諸如Flink等後起之秀的挑戰,但其仍是當之無愧的業界最主流的計算平臺。而且為了照顧數據處理和模型訓練平臺的一致性,也有大量公司採用Spark原生的機器學習平臺MLlib進行模型訓練。選擇Spark MLlib作為機器學習分佈式訓練平臺的第一站,不僅因為Spark是流行的,更是因為Spark MLlib的並行訓練方法代表著一種樸素的,直觀的解決方案。

Spark的分佈式計算原理

在介紹Spark MLlib的分佈式機器學習訓練方法之前,讓我們先回顧一下Spark的分佈式計算原理,這是分佈式機器學習的基礎。

Spark,是一個分佈式的計算平臺。所謂分佈式,指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。要清楚的是,Spark最典型的應用方式是建立在大量廉價計算節點上,這些節點可以是廉價主機,也可以是虛擬的docker container;但這種方式區別於CPU+GPU的架構,或者共享內存多處理器的高性能服務器架構。清楚這一點,對於理解後續的Spark的計算原理是重要的。

分佈式機器學習之——Spark MLlib並行訓練原理

圖1 Spark架構圖

從圖1的Spark架構圖中可以看到,Spark程序由Manager node進行調度組織,由Worker Node進行具體的計算任務執行,最終將結果返回給Drive Program。在物理的worker node上,數據還可能分為不同的partition,可以說partition是spark的基礎處理單元。

在執行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環圖),再根據DAG決定程序各步驟執行的方法。如圖2所示,該程序先分別從textFile和HadoopFile讀取文件,經過一些列操作後再進行join,最終得到處理結果。

分佈式機器學習之——Spark MLlib並行訓練原理

圖2 DAG,Spark任務的有向無環圖

在Spark平臺上並行處理圖2的DAG時,最關鍵的過程是找到哪些是可以並行處理的部分,哪些是必須shuffle和reduce的部分。

這裡的shuffle指的是所有partition(數據分片)的數據必須進行洗牌後才能得到下一步的數據,最典型的操作就是圖2中的groupByKey和join操作。拿join操作來說,必須通過在textFile數據中和hadoopFile數據中做全量的匹配才可以得到join後的dataframe。而groupby操作需要對數據中所有相同的key進行合併,也需要全局的shuffle才能夠完成。

與之相比,map,filter等操作僅需要逐條的進行數據處理和轉換就可以,不需要進行數據間的操作,因此各partition之間可以並行處理。

除此之外,在得到最終的計算結果之前,程序需要進行reduce的操作,從各partition上彙總統計結果,隨著partition的數量逐漸減小,reduce操作的並行程度逐漸降低,直到將最終的計算結果彙總到master節點上。

所以可以說shuffle和reduce操作的發生決定了純並行處理階段的邊界。如圖3所示,Spark的DAG被分割成了不同的並行處理階段(stage)。

分佈式機器學習之——Spark MLlib並行訓練原理

圖3 被shuffle操作分割的DAG stages

需要強調的是shuffle操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源,因此shuffle操作是spark程序應該儘量避免的。一句話總結Spark的計算過程就是:Stage內部數據高效並行計算,Stage邊界處進行消耗資源的shuffle操作或者最終的reduce操作。

Spark MLlib並行訓練原理

有了Spark分佈式計算過程的基礎,下面就可以更清楚的理解Spark MLlib並行訓練的原理。

在所有主流的機器學習模型中,Random Forest的模型結構特點決定了其可以完全進行數據並行的模型訓練,而GBDT的結構特點則決定了樹之間只能進行串行的訓練,這裡就不再贅述其spark的實現方式,我們將重點放在梯度下降類方法的實現上,因為梯度下降的並行程度實現質量直接決定了以Logistic Regression為基礎,以Multiple Layer Perceptron為代表的深度學習模型的訓練速度。

這裡,我們深入到Spark MLlib的源碼中,直接把Spark做mini Batch梯度下降的源碼貼出如下(代碼摘自Spark 2.4.3 GradientDescent 類的 runMiniBatchSGD 函數):

分佈式機器學習之——Spark MLlib並行訓練原理

乍一看比較複雜,這裡可以為大家做一個精簡,只列出關鍵的操作部分,大家就可以一目瞭然Spark在做什麼。

分佈式機器學習之——Spark MLlib並行訓練原理

經過精簡的代碼非常簡單,Spark的mini batch過程製作了三件事:

1.把當前的模型參數廣播到各個數據partition(可當作虛擬的計算節點)

2.各計算節點進行數據抽樣得到mini batch的數據,分別計算梯度,再通過treeAggregate操作彙總梯度,得到最終梯度gradientSum

3.利用gradientSum更新模型權重

這樣一來,每次迭代的Stage和Stage的邊界就非常清楚了,Stage內部的並行部分是各節點分別採樣並計算梯度的過程,Stage的邊界是彙總加和各節點梯度的過程。這裡再強調一下彙總梯度的操作treeAggregate,該操作是進行類似樹結構的逐層彙總,整個操作流程如圖4所示。

分佈式機器學習之——Spark MLlib並行訓練原理

圖4 treeAggregate過程示意圖

事實上,treeAggregate是一次reduce操作,本身並不包含shuffle操作,再加上採用分層的樹形操作,在每層中都是並行執行的,因此整個過程是相對高效的。

在迭代次數達到上限或者模型已經充分收斂後,模型停止訓練。這就是Spark MLlib進行mini batch梯度下降的全過程,也是Spark MLlib實現分佈式機器學習的最典型代表。

總結來說,Spark MLlib的並行訓練過程其實是“數據並行”的過程,並不涉及到過於複雜的梯度更新策略,也沒有通過“參數並行”的方式實現並行訓練。這樣的方式簡單、直觀,易於實現,但也存在著一些侷限性。

Spark MLlib並行訓練的侷限性

雖然Spark MLlib基於分佈式集群,利用數據並行的方式實現了梯度下降的並行訓練,但是有Spark MLlib使用經驗的同學應該都清楚,使用Spark MLlib訓練複雜神經網絡時,往往力不從心,不僅訓練時間過長,而且在模型參數過多時,經常會存在內存溢出的問題。具體來講,Spark MLlib的分佈式訓練方法有下面幾個弊端:

1.採用全局廣播的方式,在每輪迭代前廣播全部模型參數。眾所周知Spark的廣播過程非常消耗帶寬資源,特別是當模型的參數規模過大時,廣播過程和在每個節點都維護一個權重參數副本的過程都是極消耗資源的過程,這導致了Spark在面對複雜模型時的表現不佳;

2.採用阻斷式的梯度下降方式,每輪梯度下降由最慢的節點決定。從上面的分析可知,Spark MLlib的mini batch的過程是在所有節點計算完各自的梯度之後,逐層Aggregate最終彙總生成全局的梯度。也就是說,如果由於數據傾斜等問題導致某個節點計算梯度的時間過長,那麼這一過程將block其他所有節點無法執行新的任務。這種同步阻斷的分佈式梯度計算方式,是Spark MLlib並行訓練效率較低的主要原因;

3.Spark MLlib並不支持複雜網絡結構和大量可調超參。事實上,Spark MLlib在其標準庫裡只支持標準的多層感知機神經網絡的訓練,並不支持RNN,LSTM等複雜網絡結構,而且也無法選擇不同的activation function等大量超參。這就導致Spark MLlib在支持深度學習方面的能力欠佳。

因為這些原因,如果想尋求更高效的訓練速度和更靈活的網絡結構,勢必需要尋求其他平臺的幫助。在這樣的情勢下,Parameter Server憑藉其高效的分佈式訓練手段成為分佈式機器學習的主流,而TensorFlow,PyTorch等深度學習平臺則憑藉靈活可調整的網絡結構,完整的訓練、上線支持,成為深度學習平臺的主要選擇。下兩篇內容,本專欄將分別介紹Patameter Server和TensorFlow的並行訓練原理。

例行的問題討論時間,其他與你一起討論和分享業界相關的經驗:

1.如果希望在Spark上訓練深度學習模型,你有沒有改進Spark的方法?使用第三方lib?還是修改Spark源碼?還是自研Spark模型?

2.在訓練完成Spark模型後,應該使用什麼方式將Spark模型deploy到線上環境,做線上的實時inference?

這裡是 王喆的機器學習筆記 的第二十五篇文章。

認為文章有價值的同學,歡迎關注我的 微信公眾號:王喆的機器學習筆記(wangzhenotes),跟蹤計算廣告、推薦系統等機器學習領域前沿。

想進一步交流的同學也可以通過公眾號加我的微信一同探討技術問題,謝謝。

—END—

每週關注計算廣告、推薦系統和其他機器學習前沿文章,歡迎關注王喆的機器學習筆記

"

相關推薦

推薦中...