美團是如何應用Spark處理大數據的?

前言

美團最初的數據處理以Hive SQL為主,底層計算引擎為MapReduce,部分相對複雜的業務會由工程師編寫MapReduce程序實現。隨著業務的發展,單純的Hive SQL查詢或者MapReduce程序已經越來越難以滿足數據處理和分析的需求。

一方面,MapReduce計算模型對多輪迭代的DAG作業支持不給力,每輪迭代都需要將數據落盤,極大地影響了作業執行效率,另外只提供Map和Reduce這兩種計算因子,使得用戶在實現迭代式計算(比如:機器學習算法)時成本高且效率低。

另一方面,在數據倉庫的按天生產中,由於某些原始日誌是半結構化或者非結構化數據,因此,對其進行清洗和轉換操作時,需要結合SQL查詢以及複雜的過程式邏輯處理,這部分工作之前是由Hive SQL結合Python腳本來完成。這種方式存在效率問題,當數據量比較大的時候,流程的運行時間較長,這些ETL流程通常處於比較上游的位置,會直接影響到一系列下游的完成時間以及各種重要數據報表的生成。

基於以上原因,美團在2014年的時候引入了Spark。為了充分利用現有Hadoop集群的資源,我們採用了Spark on Yarn模式,所有的Spark app以及MapReduce作業會通過Yarn統一調度執行。Spark在美團數據平臺架構中的位置如圖所示:

美團是如何應用Spark處理大數據的?

下面將介紹Spark在美團的實踐,包括基於Spark所做的平臺化工作以及Spark在生產環境下的應用案例。其中包含Zeppelin結合的交互式開發平臺,也有使用Spark任務完成的ETL數據轉換工具,數據挖掘組基於Spark開發了特徵平臺和數據挖掘平臺,另外還有基於Spark的交互式用戶行為分析系統以及在SEM投放服務中的應用,以下是詳細介紹。

Spark交互式開發平臺

在推廣如何使用Spark的過程中,我們總結了用戶開發應用的主要需求:

數據調研:在正式開發程序之前,首先需要認識待處理的業務數據,包括:數據格式,類型(若以表結構存儲則對應到字段類型)、存儲方式、有無髒數據,甚至分析根據業務邏輯實現是否可能存在數據傾斜等等。這個需求十分基礎且重要,只有對數據有充分的掌控,才能寫出高效的Spark代碼;

代碼調試:業務的編碼實現很難保證一蹴而就,可能需要不斷地調試;如果每次少量的修改,測試代碼都需要經過編譯、打包、提交線上,會對用戶的開發效率影響是非常大的;

聯合開發:對於一整個業務的實現,一般會有多方的協作,這時候需要能有一個方便的代碼和執行結果共享的途徑,用於分享各自的想法和試驗結論。

基於這些需求,我們調研了現有的開源系統,最終選擇了Apache的孵化項目Zeppelin,將其作為基於Spark的交互式開發平臺。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了數據分析和可視化等功能。

美團是如何應用Spark處理大數據的?

我們在原生的Zeppelin上增加了用戶登陸認證、用戶行為日誌審計、權限管理以及執行Spark作業資源隔離,打造了一個美團的Spark的交互式開發平臺,不同的用戶可以在該平臺上調研數據、調試程序、共享代碼和結論。

集成在Zeppelin的Spark提供了三種解釋器:Spark、Pyspark、SQL,分別適用於編寫Scala、Python、SQL代碼。對於上述的數據調研需求,無論是程序設計之初,還是編碼實現過程中,當需要檢索數據信息時,通過Zeppelin提供的SQL接口可以很便利的獲取到分析結果;另外,Zeppelin中Scala和Python解釋器自身的交互式特性滿足了用戶對Spark和Pyspark分步調試的需求,同時由於Zeppelin可以直接連接線上集群,因此可以滿足用戶對線上數據的讀寫處理請求;最後,Zeppelin使用Web Socket通信,用戶只需要簡單地發送要分享內容所在的http鏈接,所有接受者就可以同步感知代碼修改,運行結果等,實現多個開發者協同工作。

Spark作業ETL模板 除了提供平臺化的工具以外,我們也會從其他方面來提高用戶的開發效率,比如將類似的需求進行封裝,提供一個統一的ETL模板,讓用戶可以很方便的使用Spark實現業務需求。

美團目前的數據生產主體是通過ETL將原始的日誌通過清洗、轉換等步驟後加載到Hive表中。而很多線上業務需要將Hive表裡面的數據以一定的規則組成鍵值對,導入到Tair中,用於上層應用快速訪問。其中大部分的需求邏輯相同,即把Hive表中幾個指定字段的值按一定的規則拼接成key值,另外幾個字段的值以json字符串的形式作為value值,最後將得到的對寫入Tair。

美團是如何應用Spark處理大數據的?

由於Hive表中的數據量一般較大,使用單機程序讀取數據和寫入Tair效率比較低,因此部分業務方決定使用Spark來實現這套邏輯。最初由業務方的工程師各自用Spark程序實現從Hive讀數據,寫入到Tair中(以下簡稱hive2Tair流程),這種情況下存在如下問題:

每個業務方都要自己實現一套邏輯類似的流程,產生大量重複的開發工作;

由於Spark是分佈式的計算引擎,因此代碼實現和參數設置不當很容易對Tair集群造成巨大壓力,影響Tair的正常服務。

基於以上原因,我們開發了Spark版的hive2Tair流程,並將其封裝成一個標準的ETL模板,其格式和內容如下所示:

美團是如何應用Spark處理大數據的?

source用於指定Hive表源數據,target指定目標Tair的庫和表,這兩個參數可以用於調度系統解析該ETL的上下游依賴關係,從而很方便地加入到現有的ETL生產體系中。

基於Spark的用戶特徵平臺

在沒有特徵平臺之前,各個數據挖掘人員按照各自項目的需求提取用戶特徵數據,主要是通過美團的ETL調度平臺按月/天來完成數據的提取。

但從用戶特徵來看,其實會有很多的重複工作,不同的項目需要的用戶特徵其實有很多是一樣的,為了減少冗餘的提取工作,也為了節省計算資源,建立特徵平臺的需求隨之誕生,特徵平臺只需要聚合各個開發人員已經提取的特徵數據,並提供給其他人使用。特徵平臺主要使用Spark的批處理功能來完成數據的提取和聚合。

開發人員提取特徵主要還是通過ETL來完成,有些數據使用Spark來處理,比如用戶搜索關鍵詞的統計。

開發人員提供的特徵數據,需要按照平臺提供的配置文件格式添加到特徵庫,比如在圖團購的配置文件中,團購業務中有一個用戶24小時時段支付的次數特徵,輸入就是一個生成好的特徵表,開發人員通過測試驗證無誤之後,即完成了數據上線;另外對於有些特徵,只需要從現有的表中提取部分特徵數據,開發人員也只需要簡單的配置即可完成。

美團是如何應用Spark處理大數據的?

在圖中,我們可以看到特徵聚合分兩層,第一層是各個業務數據內部聚合,比如團購的數據配置文件中會有很多的團購特徵、購買、瀏覽等分散在不同的表中,每個業務都會有獨立的Spark任務來完成聚合,構成一個用戶團購特徵表;特徵聚合是一個典型的join任務,對比MapReduce性能提升了10倍左右。第二層是把各個業務表數據再進行一次聚合,生成最終的用戶特徵數據表。

特徵庫中的特徵是可視化的,我們在聚合特徵時就會統計特徵覆蓋的人數,特徵的最大最小數值等,然後同步到RDB,這樣管理人員和開發者都能通過可視化來直觀地瞭解特徵。 另外,我們還提供特徵監測和告警,使用最近7天的特徵統計數據,對比各個特徵昨天和今天的覆蓋人數,是增多了還是減少了,比如性別為女這個特徵的覆蓋人數,如果發現今天的覆蓋人數比昨天低了1%(比如昨天6億用戶,女性2億,那麼人數降低了1%*2億=2萬)突然減少2萬女性用戶說明數據出現了極大的異常,何況網站的用戶數每天都是增長的。這些異常都會通過郵件發送到平臺和特徵提取的相關人。

Spark數據挖掘平臺

數據挖掘平臺是完全依賴於用戶特徵庫的,通過特徵庫提供用戶特徵,數據挖掘平臺對特徵進行轉換並統一格式輸出,就此開發人員可以快速完成模型的開發和迭代,之前需要兩週開發一個模型,現在短則需要幾個小時,多則幾天就能完成。特徵的轉換包括特徵名稱的編碼,也包括特徵值的平滑和歸一化,平臺也提供特徵離散化和特徵選擇的功能,這些都是使用Spark離線完成。

開發人員拿到訓練樣本之後,可以使用Spark mllib或者Python sklearn等完成模型訓練,得到最優化模型之後,將模型保存為平臺定義好的模型存儲格式,並提供相關配置參數,通過平臺即可完成模型上線,模型可以按天或者按周進行調度。當然如果模型需要重新訓練或者其它調整,那麼開發者還可以把模型下線。不只如此,平臺還提供了一個模型準確率告警的功能,每次模型在預測完成之後,會計算用戶提供的樣本中預測的準確率,並比較開發者提供的準確率告警閾值,如果低於閾值則發郵件通知開發者,是否需要對模型重新訓練。

在開發挖掘平臺的模型預測功時能我們走了點彎路,平臺的模型預測功能開始是兼容Spark接口的,也就是使用Spark保存和加載模型文件並預測,使用過的人知道Spark mllib的很多API都是私有的開發人員無法直接使用,所以我們這些接口進行封裝然後再提供給開發者使用,但也只解決了Spark開發人員的問題,平臺還需要兼容其他平臺的模型輸出和加載以及預測的功能,這讓我們面臨必需維護一個模型多個接口的問題,開發和維護成本都較高,最後還是放棄了兼容Spark接口的實現方式,我們自己定義了模型的保存格式,以及模型加載和模型預測的功能。

美團是如何應用Spark處理大數據的?

以上內容介紹了美團基於Spark所做的平臺化工作,這些平臺和工具是面向全公司所有業務線服務的,旨在避免各團隊做無意義的重複性工作,以及提高公司整體的數據生產效率。

隨著Spark的發展和推廣,從上游的ETL到下游的日常數據統計分析、推薦和搜索系統,越來越多的業務線開始嘗試使用Spark進行各種複雜的數據處理和分析工作。

下面將以Spark在交互式用戶行為分析系統以及SEM投放服務為例,介紹Spark在美團實際業務生產環境下的應用。

Spark在交互式用戶行為分析系統中的實踐 美團的交互式用戶行為分析系統,用於提供對海量的流量數據進行交互式分析的功能,系統的主要用戶為公司內部的PM和運營人員。

普通的BI類報表系統,只能夠提供對聚合後的指標進行查詢,比如PV、UV等相關指標。但是PM以及運營人員除了查看一些聚合指標以外,還需要根據自己的需求去分析某一類用戶的流量數據,進而瞭解各種用戶群體在App上的行為軌跡。根據這些數據,PM可以優化產品設計,運營人員可以為自己的運營工作提供數據支持,用戶核心的幾個訴求包括:

自助查詢,不同的PM或運營人員可能隨時需要執行各種各樣的分析功能,因此係統需要支持用戶自助使用。

響應速度,大部分分析功能都必須在幾分鐘內完成。

可視化,可以通過可視化的方式查看分析結果。

要解決上面的幾個問題,技術人員需要解決以下兩個核心問題:

海量數據的處理,用戶的流量數據全部存儲在Hive中,數據量非常龐大,每天的數據量都在數十億的規模。

快速計算結果,系統需要能夠隨時接收用戶提交的分析任務,並在幾分鐘之內計算出他們想要的結果。

要解決上面兩個問題,目前可供選擇的技術主要有兩種:MapReduce和Spark。在初期架構中選擇了使用MapReduce這種較為成熟的技術,但是通過測試發現,基於MapReduce開發的複雜分析任務需要數小時才能完成,這會造成極差的用戶體驗,用戶無法接受。

因此我們嘗試使用Spark這種內存式的快速大數據計算引擎作為系統架構中的核心部分,主要使用了Spark Core以及Spark SQL兩個組件,來實現各種複雜的業務邏輯。實踐中發現,雖然Spark的性能非常優秀,但是在目前的發展階段中,還是或多或少會有一些性能以及OOM方面的問題。

因此在項目的開發過程中,對大量Spark作業進行了各種各樣的性能調優,包括算子調優、參數調優、shuffle調優以及數據傾斜調優等,最終實現了所有Spark作業的執行時間都在數分鐘左右。並且在實踐中解決了一些shuffle以及數據傾斜導致的OOM問題,保證了系統的穩定性。

結合上述分析,最終的系統架構與工作流程如下所示:

用戶在系統界面中選擇某個分析功能對應的菜單,並進入對應的任務創建界面,然後選擇篩選條件和任務參數,並提交任務。

由於系統需要滿足不同類別的用戶行為分析功能(目前系統中已經提供了十個以上分析功能),因此需要為每一種分析功能都開發一個Spark作業。

採用J2EE技術開發了Web服務作為後臺系統,在接收到用戶提交的任務之後,根據任務類型選擇其對應的Spark作業,啟動一條子線程來執行Spark-submit命令以提交Spark作業。

Spark作業運行在Yarn集群上,並針對Hive中的海量數據進行計算,最終將計算結果寫入數據庫中。

用戶通過系統界面查看任務分析結果,J2EE系統負責將數據庫中的計算結果返回給界面進行展現。

美團是如何應用Spark處理大數據的?

該系統上線後效果良好:90%的Spark作業運行時間都在5分鐘以內,剩下10%的Spark作業運行時間在30分鐘左右,該速度足以快速響應用戶的分析需求。通過反饋來看,用戶體驗非常良好。目前每個月該系統都要執行數百個用戶行為分析任務,有效並且快速地支持了PM和運營人員的各種分析需求。

Spark在SEM投放服務中的應用

流量技術組負責著美團站外廣告的投放技術,目前在SEM、SEO、DSP等多種業務中大量使用了Spark平臺,包括離線挖掘、模型訓練、流數據處理等。美團SEM(搜索引擎營銷)投放著上億的關鍵詞,一個關鍵詞從被挖掘策略發現開始,就踏上了精彩的SEM之旅。它經過預估模型的篩選,投放到各大搜索引擎,可能因為市場競爭頻繁調價,也可能因為效果不佳被迫下線。而這樣的旅行,在美團每分鐘都在發生。如此大規模的隨機“遷徙”能夠順利進行,Spark功不可沒。

美團是如何應用Spark處理大數據的?

Spark不止用於美團SEM的關鍵詞挖掘、預估模型訓練、投放效果統計等大家能想到的場景,還罕見地用於關鍵詞的投放服務,這也是本段介紹的重點。一個快速穩定的投放系統是精準營銷的基礎。

美團早期的SEM投放服務採用的是單機版架構,隨著關鍵詞數量的極速增長,舊有服務存在的問題逐漸暴露。受限於各大搜索引擎API的配額(請求頻次)、賬戶結構等規則,投放服務只負責處理API請求是遠遠不夠的,還需要處理大量業務邏輯。單機程序在小數據量的情況下還能通過多進程勉強應對,但對於如此大規模的投放需求,就很難做到“兼顧全局”了。

結論和展望

本文介紹了美團引入Spark的起源,基於Spark所做的一些平臺化工作,以及Spark在美團具體應用場景下的實踐。總體而言,Spark由於其靈活的編程接口、高效的內存計算,能夠適用於大部分數據處理場景。

相關推薦

推薦中...