'Spark Streaming 場景應用'
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming 能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若 Spark UI 無法滿足你所需的監控需要,用戶可以定製個性化監控信息。 Spark Streaming 提供了 StreamingListener 特質,通過繼承此方法,就可以定製所需的監控,其代碼如下:
@DeveloperApitrait StreamingListener {/** Called when a receiver has been started */def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }/** Called when a receiver has reported an error */def onReceiverError(receiverError: StreamingListenerReceiverError) { }/** Called when a receiver has been stopped */def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }/** Called when a batch of jobs has been submitted for processing. */def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }/** Called when processing of a batch of jobs has started. */def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }/** Called when processing of a batch of jobs has completed. */def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }/** Called when processing of a job of a batch has started. */def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted) { }/** Called when processing of a job of a batch has completed. */def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}
目前,我們保存 Offsets 時,採用繼承 StreamingListener 方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
四、Spark Streaming 優缺點
Spark Streaming 並非是 Storm 那樣,其並非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成 Spark 其他計算模塊,包括 MLlib(機器學習)、Graphx 以及 Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。
4.1 優點
- Spark Streaming 基於 Spark Core API,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
- Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之後處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
- Spark Streaming 採用統一的 DAG 調度以及 RDD,因此能夠利用其lineage 機制,對實時計算有很好的容錯支持;
- Spark Streaming 的 DStream 是基於 RDD 的在流式數據處理方面的抽象,其 transformations 以及 actions 有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉 Spark 之後,能夠快速上手 Spark Streaming。
4.2 缺點
Spark Streaming 是準實時的數據處理框架,採用粗粒度的處理方式,當 batch time 到時才會觸發計算,這並非像 Storm 那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
目前來看,Spark Streaming 穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。
四、總結
本篇文章主要介紹了 Spark Streaming 在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;並且重點關注了下 Spark Streaming 在監控方面所作的努力。
首先本文介紹了 Spark Streaming 應用場景以及在我們的實際應用中所採取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹 Spark UI 相關監控信息等;最後對 Spark Streaming 的優缺點進行概括。
大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。但是,學習人工智能要求也是很高的,必須是研究生以上,你才能躋身這個行業。
眼下呢,,,先學習好大數據~~~強大自身!
小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,
轉發關注小編,直接私信小編“學習”來進行獲取~~~~
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming 能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若 Spark UI 無法滿足你所需的監控需要,用戶可以定製個性化監控信息。 Spark Streaming 提供了 StreamingListener 特質,通過繼承此方法,就可以定製所需的監控,其代碼如下:
@DeveloperApitrait StreamingListener {/** Called when a receiver has been started */def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }/** Called when a receiver has reported an error */def onReceiverError(receiverError: StreamingListenerReceiverError) { }/** Called when a receiver has been stopped */def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }/** Called when a batch of jobs has been submitted for processing. */def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }/** Called when processing of a batch of jobs has started. */def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }/** Called when processing of a batch of jobs has completed. */def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }/** Called when processing of a job of a batch has started. */def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted) { }/** Called when processing of a job of a batch has completed. */def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}
目前,我們保存 Offsets 時,採用繼承 StreamingListener 方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
四、Spark Streaming 優缺點
Spark Streaming 並非是 Storm 那樣,其並非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成 Spark 其他計算模塊,包括 MLlib(機器學習)、Graphx 以及 Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。
4.1 優點
- Spark Streaming 基於 Spark Core API,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
- Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之後處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
- Spark Streaming 採用統一的 DAG 調度以及 RDD,因此能夠利用其lineage 機制,對實時計算有很好的容錯支持;
- Spark Streaming 的 DStream 是基於 RDD 的在流式數據處理方面的抽象,其 transformations 以及 actions 有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉 Spark 之後,能夠快速上手 Spark Streaming。
4.2 缺點
Spark Streaming 是準實時的數據處理框架,採用粗粒度的處理方式,當 batch time 到時才會觸發計算,這並非像 Storm 那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
目前來看,Spark Streaming 穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。
四、總結
本篇文章主要介紹了 Spark Streaming 在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;並且重點關注了下 Spark Streaming 在監控方面所作的努力。
首先本文介紹了 Spark Streaming 應用場景以及在我們的實際應用中所採取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹 Spark UI 相關監控信息等;最後對 Spark Streaming 的優缺點進行概括。
大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。但是,學習人工智能要求也是很高的,必須是研究生以上,你才能躋身這個行業。
眼下呢,,,先學習好大數據~~~強大自身!
小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,
轉發關注小編,直接私信小編“學習”來進行獲取~~~~
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming 能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若 Spark UI 無法滿足你所需的監控需要,用戶可以定製個性化監控信息。 Spark Streaming 提供了 StreamingListener 特質,通過繼承此方法,就可以定製所需的監控,其代碼如下:
@DeveloperApitrait StreamingListener {/** Called when a receiver has been started */def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }/** Called when a receiver has reported an error */def onReceiverError(receiverError: StreamingListenerReceiverError) { }/** Called when a receiver has been stopped */def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }/** Called when a batch of jobs has been submitted for processing. */def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }/** Called when processing of a batch of jobs has started. */def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }/** Called when processing of a batch of jobs has completed. */def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }/** Called when processing of a job of a batch has started. */def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted) { }/** Called when processing of a job of a batch has completed. */def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}
目前,我們保存 Offsets 時,採用繼承 StreamingListener 方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
四、Spark Streaming 優缺點
Spark Streaming 並非是 Storm 那樣,其並非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成 Spark 其他計算模塊,包括 MLlib(機器學習)、Graphx 以及 Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。
4.1 優點
- Spark Streaming 基於 Spark Core API,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
- Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之後處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
- Spark Streaming 採用統一的 DAG 調度以及 RDD,因此能夠利用其lineage 機制,對實時計算有很好的容錯支持;
- Spark Streaming 的 DStream 是基於 RDD 的在流式數據處理方面的抽象,其 transformations 以及 actions 有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉 Spark 之後,能夠快速上手 Spark Streaming。
4.2 缺點
Spark Streaming 是準實時的數據處理框架,採用粗粒度的處理方式,當 batch time 到時才會觸發計算,這並非像 Storm 那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
目前來看,Spark Streaming 穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。
四、總結
本篇文章主要介紹了 Spark Streaming 在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;並且重點關注了下 Spark Streaming 在監控方面所作的努力。
首先本文介紹了 Spark Streaming 應用場景以及在我們的實際應用中所採取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹 Spark UI 相關監控信息等;最後對 Spark Streaming 的優缺點進行概括。
大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。但是,學習人工智能要求也是很高的,必須是研究生以上,你才能躋身這個行業。
眼下呢,,,先學習好大數據~~~強大自身!
小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,
轉發關注小編,直接私信小編“學習”來進行獲取~~~~
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming 能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若 Spark UI 無法滿足你所需的監控需要,用戶可以定製個性化監控信息。 Spark Streaming 提供了 StreamingListener 特質,通過繼承此方法,就可以定製所需的監控,其代碼如下:
@DeveloperApitrait StreamingListener {/** Called when a receiver has been started */def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }/** Called when a receiver has reported an error */def onReceiverError(receiverError: StreamingListenerReceiverError) { }/** Called when a receiver has been stopped */def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }/** Called when a batch of jobs has been submitted for processing. */def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }/** Called when processing of a batch of jobs has started. */def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }/** Called when processing of a batch of jobs has completed. */def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }/** Called when processing of a job of a batch has started. */def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted) { }/** Called when processing of a job of a batch has completed. */def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}
目前,我們保存 Offsets 時,採用繼承 StreamingListener 方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
四、Spark Streaming 優缺點
Spark Streaming 並非是 Storm 那樣,其並非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成 Spark 其他計算模塊,包括 MLlib(機器學習)、Graphx 以及 Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。
4.1 優點
- Spark Streaming 基於 Spark Core API,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
- Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之後處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
- Spark Streaming 採用統一的 DAG 調度以及 RDD,因此能夠利用其lineage 機制,對實時計算有很好的容錯支持;
- Spark Streaming 的 DStream 是基於 RDD 的在流式數據處理方面的抽象,其 transformations 以及 actions 有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉 Spark 之後,能夠快速上手 Spark Streaming。
4.2 缺點
Spark Streaming 是準實時的數據處理框架,採用粗粒度的處理方式,當 batch time 到時才會觸發計算,這並非像 Storm 那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
目前來看,Spark Streaming 穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。
四、總結
本篇文章主要介紹了 Spark Streaming 在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;並且重點關注了下 Spark Streaming 在監控方面所作的努力。
首先本文介紹了 Spark Streaming 應用場景以及在我們的實際應用中所採取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹 Spark UI 相關監控信息等;最後對 Spark Streaming 的優缺點進行概括。
大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。但是,學習人工智能要求也是很高的,必須是研究生以上,你才能躋身這個行業。
眼下呢,,,先學習好大數據~~~強大自身!
小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,
轉發關注小編,直接私信小編“學習”來進行獲取~~~~
作者:徐勝國
來源:數盟
Spark Streaming 是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制能夠滿足我們很多的場景應用。本篇結合我們的應用場景,介結我們在使用 Spark Streaming 方面的技術架構,並著重講解 Spark Streaming 兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接著介紹了 Spark Streaming 在監控方面所做的一些事情,最後總結了 Spark Streaming 的優缺點。
一、概述
數據是非常寶貴的資源,對各級企事業單均有非常高的價值。但是數據的爆炸,導致原先單機的數據處理已經無法滿足業務的場景需求。因此在此基礎上出現了一些優秀的分佈式計算框架,諸如 Hadoop、Spark 等。離線分佈式處理框架雖然能夠處理非常大量的數據,但是其遲滯性很難滿足一些特定的需求場景,比如 push 反饋、實時推薦、實時用戶行為等。為了滿足這些場景,使數據處理能夠達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有 Apache Storm、Apache Flink 以及 Spark Streaming 等。其中 Spark Streaming 由於其本身的擴展性、高吞吐量以及容錯能力等特性,並且能夠和離線各種框架有效結合起來,因而是當下是比較受歡迎的一種流式處理框架。
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特點。Spark Streaming 支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後可以用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在很多地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
Spark Streaming 其優秀的特點給我們帶來很多的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行為、用戶遷移等。本文中,將為大家詳細介紹,我們的應用場景中,Spark Streaming 的技術架構、兩種狀態模型以及 Spark Streaming 監控等。
二、應用場景
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集卻是逐條進行的,因此 Spark Streaming 系統需要設置間隔使得數據彙總到一定的量後再一併操作,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交作業的頻率和數據處理的延遲,同時也影響著數據處理的吞吐量和性能。
2.1 框架
目前我們 Spark Streaming 的業務應用場景包括異常監測、網頁點擊、用戶行為以及用戶地圖遷徙等場景。按計算模型來看大體可分為無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,我們採用Kafka作為實時輸入源,Spark Streaming 作為計算引擎處理完數據之後,再持久化到存儲中,包括 MySQL、HDFS、ElasticSearch 以及 MongoDB 等;同時 Spark Streaming 數據清洗後也會寫入 Kafka,然後經由 Flume 持久化到 HDFS;接著基於持久化的內容做一些 UI 的展現。架構見下圖:
2.2 無狀態模型
無狀態模型只關注當前新生成的 DStream 數據,所以的計算邏輯均基於該批次的數據進行處理。無狀態模型能夠很好地適應一些應用場景,比如網站點擊實時排行榜、指定 batch 時間段的用戶訪問以及點擊情況等。該模型由於沒有狀態,並不需要考慮有狀態的情況,只需要根據業務場景保證數據不丟就行。此種情況一般採用 Direct 方式讀取 Kafka 數據,並採用監聽器方式持久化 Offsets 即可。具體流程如下:
其上模型框架包含以下幾個處理步驟:
- 讀取 Kafka 實時數據;
- Spark Streaming Transformations 以及 actions 操作;
- 將數據結果持久化到存儲中,跳轉到步驟一。
受網絡、集群等一些因素的影響,實時程序出現長時失敗,導致數據出現堆積。此種情況下是丟掉堆積的數據從 Kafka largest 處消費還是從之前的 Kafka offsets處消費,這個取決具體的業務場景。
2.3 狀態模型
有狀態模型是指 DStreams 在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,可以是 2 個及以上的多個 batch time RDD 組成。Spark Streaming 提供了 updateStateByKey 方法來滿足此類的業務場景。因涉及狀態的問題,所以在實際的計算過程中需要保存計算的狀態,Spark Streaming 中通過 checkpoint 來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近 N batch time 的網站訪問情況以及 app 新增累計統計等等。具體流程如下:
上述流程中,每 batch time 計算時,需要依賴最近 2 個 batch time 內的數據,經過轉換及相關統計,最終持久化到 MySQL 中去。不過為了確保每個計算僅計算 2 個 batch time 內的數據,需要維護數據的狀態,清除過期的數據。我們先來看下 updateStateByKey 的實現,其代碼如下:
暴露了全局狀態數據中的 key 類型的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
隱藏了全局狀態數據中的 key 類型,僅對 Value 提供自定義的方法。
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner,initialRDD: RDD[(K, S)]): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)}
以上兩種方法分別給我們提供清理過期數據的思路:
- 泛型 K 進行過濾。K 表示全局狀態數據中對應的 key,如若 K 不滿足指定條件則反回 false;
- 返回值過濾。第二個方法中自定義函數指定了 Option[S] 返回值,若過期數據返回 None,那麼該數據將從全局狀態中清除。
三、Spark Streaming 監控
同 Spark 一樣,Spark Streaming 也提供了 Jobs、Stages、Storage、Enviorment、Executors 以及 Streaming 的監控,其中 Streaming 監控頁的內容如下圖:
上圖是 Spark UI 中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展現。另外除了提供上述數據監控外,Spark UI 還提供了 Active Batches 以及 Completed Batches 相關信息。Active Batches 包含當前正在處理的 batch 信息以及堆積的 batch 相關信息,而 Completed Batches 剛提供每個 batch 處理的明細數據,具體包括 batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming 能夠提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若 Spark UI 無法滿足你所需的監控需要,用戶可以定製個性化監控信息。 Spark Streaming 提供了 StreamingListener 特質,通過繼承此方法,就可以定製所需的監控,其代碼如下:
@DeveloperApitrait StreamingListener {/** Called when a receiver has been started */def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }/** Called when a receiver has reported an error */def onReceiverError(receiverError: StreamingListenerReceiverError) { }/** Called when a receiver has been stopped */def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }/** Called when a batch of jobs has been submitted for processing. */def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }/** Called when processing of a batch of jobs has started. */def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }/** Called when processing of a batch of jobs has completed. */def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }/** Called when processing of a job of a batch has started. */def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted) { }/** Called when processing of a job of a batch has completed. */def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}
目前,我們保存 Offsets 時,採用繼承 StreamingListener 方式,此是一種應用場景。當然也可以監控實時計算程序的堆積情況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
四、Spark Streaming 優缺點
Spark Streaming 並非是 Storm 那樣,其並非是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,能夠較好地集成 Spark 其他計算模塊,包括 MLlib(機器學習)、Graphx 以及 Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲作為流式的實時性等性能。
4.1 優點
- Spark Streaming 基於 Spark Core API,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
- Spark Streaming 是粗粒度的準實時處理框架,一次讀取完或異步讀完之後處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
- Spark Streaming 採用統一的 DAG 調度以及 RDD,因此能夠利用其lineage 機制,對實時計算有很好的容錯支持;
- Spark Streaming 的 DStream 是基於 RDD 的在流式數據處理方面的抽象,其 transformations 以及 actions 有較大的相似性,這在一定程度上降低了用戶的使用門檻,在熟悉 Spark 之後,能夠快速上手 Spark Streaming。
4.2 缺點
Spark Streaming 是準實時的數據處理框架,採用粗粒度的處理方式,當 batch time 到時才會觸發計算,這並非像 Storm 那樣是純流式的數據處理方式。此種方式不可避免會出現相應的計算延遲 。
目前來看,Spark Streaming 穩定性方面還是會存在一些問題。有時會因一些莫名的異常導致退出,這種情況下得需要自己來保證數據一致性以及失敗重啟功能等。
四、總結
本篇文章主要介紹了 Spark Streaming 在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;並且重點關注了下 Spark Streaming 在監控方面所作的努力。
首先本文介紹了 Spark Streaming 應用場景以及在我們的實際應用中所採取的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接著通過監聽器模式介紹 Spark UI 相關監控信息等;最後對 Spark Streaming 的優缺點進行概括。
大家一定要學好大數據,為人工智能做準備,好好的提升一下自己的大數據知識儲備,厚積薄發。但是,學習人工智能要求也是很高的,必須是研究生以上,你才能躋身這個行業。
眼下呢,,,先學習好大數據~~~強大自身!
小編這裡正好有一套大數據從入門到精通的視頻教程,希望大家能夠喜歡,,,,,
轉發關注小編,直接私信小編“學習”來進行獲取~~~~