spark系列:Spark Streaming官方文檔譯文

概述

spark stream是對spark core api的擴展;對於spark core不太瞭解的請閱讀:spark系列:spark core 數據交互技術點(數據模型)。所以本質上是通過批處理來模擬流處理。

spark stream的流數據源可以來自Kafka, Flume, Kinesis, 或 TCP sockets甚至是文件。

spark系列:Spark Streaming官方文檔譯文

spark stream

對於流數據可以做很多複雜的處理(只有想不到,沒有做不到的),如map操作、reduce操作、join操作,甚至是在線訓練機器學習模型等等。

最終處理完之後的數據可以寫入各種文件系統,如HDFS、數據庫等等。

內部機制

spark stream模塊接收流數據,並按照時間維度將其分割成一段段的小量的批數據,然後通過spark core引擎來處理。

spark系列:Spark Streaming官方文檔譯文

批處理模擬流處理

對外提供的接口本質上是對離散小批量數據(discretized stream or DStream)的處理來模擬的流數據。

spark系列:Spark Streaming官方文檔譯文

基本概念

jar依賴

開發spark stream流處理程序,需要添加如下依賴:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>

對於外部數據源是kafka、Flume、Flume的,需要額外添加如下依賴:

spark系列:Spark Streaming官方文檔譯文

jar 依賴

初始化上下文

val spark = SparkSession.builder()
.appName("wordCount")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))

關鍵參數:Seconds(2) 是流數據的窗口長度;

接入數據源

val lines = ssc.textFileStream("/home/panteng/桌面/stream")//文件
val lines = ssc.socketTextStream("localhost", 9999)//TcpSocket

定義計算流程

/**
* DEMO:每隔6秒鐘統計最近30秒的數據,每隔1分鐘存儲一次
* 應用場景:每天更新用戶最近30天的行為數據
*
* @param ssc StreamingContext
*/
def socketStreamWindow(ssc: StreamingContext): Unit = {
val lines = ssc.socketTextStream("localhost", 9999)
val statistics = lines.flatMap(_.split(" "))
.map(w => (w, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(6))
statistics.print()
statistics.window(Minutes(1), Minutes(1))
.repartition(1).saveAsTextFiles("/home/xxx/IdeaProjects/hadoop-ecological/output/time")
ssc.start()
ssc.awaitTermination()
}

啟動

ssc.start()

一旦啟動之後,就不能在定義新的處理流程。

上下文被關閉,則不能夠重新啟動

一個JVM虛擬機中只能有一個StreamingContext處於活躍(active)狀態

默認情況下關閉StreamingContext,也會關閉sparkContext;可以通過參數設置只關閉前者。

核心概念(Dstream)

Dstream是由一系列連續的RDD構成的有序集合。每一個RDD代表一段固定長度時間間隔內的數據。

spark系列:Spark Streaming官方文檔譯文

Dtream

對Dtream的操作實際上是對一系列的RDD操作。

spark系列:Spark Streaming官方文檔譯文

接收者(Receivers)

receiver的作用是從流數據源接受數據,並存儲曹spark應用的內存中,等待後期處理。

一般來講,在一個worker/excutor中,一個數據流對應一個receiver,如果一個worker有多個數據流,那麼需要對應多個receiver,這是必須保證worker的core數量大於receiver的數量,否則只能接受數據,但不能處理。

Transform操作

transform是將一個RDD轉成另一個RDD的操作,接受一個rdd->rdd的函數,並且可以引用外部的rdd。好神奇的一個操作,

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}

窗口操作

兩個關鍵參數窗口長度和移動長度。並且提供了與窗口有關的一系列曹組。如:reduceByWindow、reduceByKeyAndWindow等。

spark系列:Spark Streaming官方文檔譯文

Window Operations

spark系列文章,歡迎關注查看:

spark系列:spark core 數據交互技術點(數據模型)

spark系列:RDD、DataSet、DataFrame的區別

spark系列:spark生態組件與應用場景

spark系列:常見問題TOP5及解決方案

相關推薦

推薦中...