'Linux環境Spark安裝配置及使用(六)'

"
"
Linux環境Spark安裝配置及使用(六)

12. 認識 Spark Streaming

(1) Spark Streaming 簡介

  • 流式計算框架(類似於Storm)
  • 常用的實時計算引擎(流式計算)
  • <1>. Apache Storm:真正的流式計算
  • <2>. Spark Streaming :嚴格上來說不是真正的流式計算(實時計算),把連續的流式數據,當成不連續的RDD,本質是一個離散計算(不連續)
  • <3>. Apache Flink:真正的流式計算,與Spark Streaming相反, 把離散的數據,當成流式數據來處理
  • <4>. JStorm
  • Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.(易於構建靈活的、高容錯的流式系統)
  • Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據可以從諸如Kafka,Flume,Kinesis或TCP套接字等眾多來源獲取,並且可以使用由高級函數(如map,reduce,join和window)開發的複雜算法進行流數據處理。最後,處理後的數據可以被推送到文件系統,數據庫和實時儀表板。而且,還可以在數據流上應用Spark提供的機器學習和圖處理算法。

(2) Spark Streaming 的特點

  • <1>. 易用,已經集成到Spark中
  • <2>. 容錯性:底層RDD,RDD本身具有容錯機制
  • <3>. 支持多種語言:Java Scala Python

(3) Spark Streaming的內部結構

  • 在內部,它的工作原理如下。Spark Streaming接收實時輸入數據流,並將數據切分成批,然後由Spark引擎對其進行處理,最後生成“批”形式的結果流。
  • Spark Streaming將連續的數據流抽象為discretizedstream或DStream。在內部DStream 由一個RDD序列表示。

13. Spark Streaming 基礎

(1) Spark Streaming 官方示例

  • <1>. 介紹:
  • 向Spark Streaming中發送字符串,Spark 接收到以後進行計數
  • <2>. 準備工作:
  • netcat網絡工具(yum install nc.x86_64)
  • **注意:**總核數大於等於2,一個核心用於接收數據,另一個用於處理數據
  • <3>. 操作:
  • 啟動同一Linux系統的兩個窗口,一個負責輸入,一個負責監聽
  • 窗口1:nc -l 1234 (-l監聽模式;1234端口號)
  • 窗口2:run-example streaming.NetworkWordCount localhost 1234
  • 在窗口1輸入文本信息,窗口2監聽並進行計數統計

(2) 自寫 Spark Streaming 官方示例

  • MyNetworkWordCount.scala
/**
*
* @ClassName: MyNetworkWordCount
* @Description
* @Author: YBCarry
* @Date2019-05-13 20:49
* @Version: V1.0
*
**/
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.internal.Logging
/*
* 自寫流式計算程序
*
* 知識點:
* 1、創建一個StreamingContext對象 --> 核心:創建一個DStream
* 2、DStream的表現形式:就是一個RDD
* 3、使用DStream把連續的數據流變成不連續的RDD
*
* spark Streaming 最核心的內容
*/
object MyNetworkWordCount {
def main(args: Array[String]): Unit = {


//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))

//創建DStream 從netcat服務器上接收數據
val lines = ssc.socketTextStream("172.16.194.128", 1234, StorageLevel.MEMORY_ONLY)

//lines中包含了netcat服務器發送過來的數據
//分詞操作
val words = lines.flatMap(_.split(" "))

//計數
val wordPair = words.transform(x => x.map(x => (x, 1)))

//打印結果
wordPair.print()

//啟動StreamingContext 進行計算
ssc.start()

//等待任務結束
ssc.awaitTermination()

}
}
複製代碼

14. Spark Streaming 進階

(1) StreamingContext對象詳解

  • 初始化StreamingContext:
  • 方式一:從SparkConf對象中創建:
  • 方式二:從一個現有的SparkContext實例中創建
  • 程序中的幾點說明:
  • appName參數是應用程序在集群UI上顯示的名稱。
  • master是Spark,Mesos或YARN集群的URL,或者一個特殊的“local [*]”字符串來讓程序以本地模式運行。
  • 當在集群上運行程序時,不需要在程序中硬編碼master參數,而是使用spark-submit提交應用程序並將master的URL以腳本參數的形式傳入。但是,對於本地測試和單元測試,您可以通過“local[*]”來運行Spark Streaming程序(請確保本地系統中的cpu核心數夠用)。
  • StreamingContext會內在的創建一個SparkContext的實例(所有Spark功能的起始點),你可以通過ssc.sparkContext訪問到這個實例。
  • 批處理的時間窗口長度必須根據應用程序的延遲要求和可用的集群資源進行設置。
  • 注意:
  • 一旦一個StreamingContextt開始運作,就不能設置或添加新的流計算。
  • 一旦一個上下文被停止,它將無法重新啟動。
  • 同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。
  • StreamingContext上的stop()方法也會停止SparkContext。 要僅停止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置為false。
  • 只要前一個StreamingContext在下一個StreamingContext被創建之前停止(不停止SparkContext),SparkContext就可以被重用來創建多個StreamingContext。

(2) 離散流(DStreams):Discretized Streams

  • 把連續的數據變成不連續的RDD
  • 因為DStream的特性,導致,Spark Streaming不是真正的流式計算
  • DiscretizedStream或DStream 是Spark Streaming對流式數據的基本抽象。它表示連續的數據流,這些連續的數據流可以是從數據源接收的輸入數據流,也可以是通過對輸入數據流執行轉換操作而生成的經處理的數據流。在內部,DStream由一系列連續的RDD表示,如下圖:
  • 舉例分析:在之前的NetworkWordCount的例子中,我們將一行行文本組成的流轉換為單詞流,具體做法為:將flatMap操作應用於名為lines的DStream中的每個RDD上,以生成words DStream的RDD。如下圖所示:
  • 但是DStream和RDD也有區別,下面畫圖說明:

(3) 轉換操作(transformation)

  • transform(func)
  • 通過RDD-to-RDD函數作用於源DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD
  • 舉例:在NetworkWordCount中,也可以使用transform來生成元組對
  • updateStateByKey(func)
  • 操作允許不斷用新信息更新它的同時保持任意狀態。
  • 定義狀態:狀態可以是任何的數據類型
  • 定義狀態更新函數:怎樣利用更新前的狀態和從輸入流裡面獲取的新值更新狀態
  • 重寫NetworkWordCount程序,累計每個單詞出現的頻率(注意:累計)
  • TotalNetworkWordCount.scala
 package test.Network

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
*
* @ClassName: TotalNetworkWordCount
* @Description: 實現累加操作
* @Author: YBCarry
* @Date2019-05-15 16:05
* @Version: V1.0
*
**/
object TotalNetworkWordCount {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))

//設置檢查點目錄,保存之前都的狀態信息
ssc.checkpoint("")

//創建DStream
val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)

//分割
val words = lines.flatMap(_.split(" "))

//計數
// val wordPair = words.map((_, 1))
val wordPair = words.transform( x => x.map(x => (x, 1)))

//定義一個值函數 ;累加計數
/*
* 接收兩個參數
* currentValues —— 當前值
* previousValue ——歷史值
* */
val addFunc = (currentValues : Seq[Int], previousValues : Option[Int]) => {

//累加當前的序列
val currrentTotal = currentValues.sum
//累加歷史值
Some(currrentTotal + previousValues.getOrElse(0))
}

//累加運算
val total = wordPair.updateStateByKey(addFunc)

total.print()

ssc.start()

ssc.awaitTermination()
}
}
複製代碼

複製代碼

(4) 窗口操作

  • Spark Streaming還提供了窗口計算功能,允許在數據的滑動窗口上應用轉換操作。下圖說明了滑動窗口的工作方式:
  • 如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合並被執行操作以產生windowed DStream的RDD。在上面的例子中,操作應用於最近3個時間單位的數據,並以2個時間單位滑動。這表明任何窗口操作都需要指定兩個參數。
  • 窗口長度(windowlength) - 窗口的時間長度(上圖的示例中為:3)。
  • 滑動間隔(slidinginterval) - 兩次相鄰的窗口操作的間隔(即每次滑動的時間長度)(上圖示例中為:2)。
  • 這兩個參數必須是源DStream的批間隔的倍數(上圖示例中為:1)。
  • e.g.: 假設對之前的單詞計數的示例進行擴展,每10秒鐘對過去30秒的數據進行wordcount。則在最近30秒的pairs DStream數據中對(word, 1)鍵值對應用reduceByKey操作。這是通過使用reduceByKeyAndWindow操作完成的。
 package test.NetworkByWindow

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
*
* @ClassName: NetworkWordCountByWindow
* @Description: 每10秒讀取過去30秒的數據
* @Author: YBCarry
* @Date2019-05-15 17:00
* @Version: V1.0
*
**/
object NetworkWordCountByWindow {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))

//設置檢查點目錄,保存之前都的狀態信息
ssc.checkpoint("")

//創建DStream
val lines = ssc.socketTextStream("bigdata01", 1234, StorageLevel.MEMORY_ONLY)

//分割 每個單詞計數
val words = lines.flatMap(_.split(" ")).map((_, 1))

/*
* 窗口操作
* 參數說明:要進行的操作 窗口的大小(30s) 窗口移動距離(12s) ——> 採樣時間(3)的整數倍
* */
val result = words.reduceByKeyAndWindow((x : Int, y : Int) => (x + y), Seconds(30), Seconds(12))
}

}
複製代碼

15. Spark 數據源

(1) 輸入DStreams和接收器

  • 輸入DStreams表示從數據源獲取輸入數據流的DStreams。在NetworkWordCount例子中,lines表示輸入DStream,它代表從netcat服務器獲取的數據流。每一個輸入流DStream和一個Receiver對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。
  • 輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源:
  • 基本源(Basic sources):這些源在StreamingContext API中直接可用。例如文件系統、套接字連接、Akka的actor等
  • 高級源(Advanced sources):這些源包括Kafka,Flume,Kinesis,Twitter等等。
  • 下面通過具體的案例,詳細說明:

(2) 基本源

  • <1>. 文件流:通過監控文件系統的變化,若有新文件添加,則將它讀入並作為數據流
  • 注意:
  • ① 這些文件具有相同的格式
  • ② 這些文件通過原子移動或重命名文件的方式在dataDirectory創建
  • ③ 如果在文件中追加內容,這些追加的新數據也不會被讀取。
  • Spark Streaming監控一個文件夾,如果有變化,則把變化採集過來
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
*
* @ClassName: FileStreaming
* @Description
* @Author: YBCarry
* @Date2019-05-16 09:24
* @Version: V1.0
*
**/
object FileStreaming {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyFileStreaming").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(10))

//監控目錄,讀取產生的新文件
val lines = ssc.textFileStream("\\\\Users\\\\apple\\\\學習\\\\SparkFiles")

lines.print()

ssc.start()
ssc.awaitTermination()

}

}
複製代碼
  • 注意:需要在原文件中編輯,然後拷貝一份。
  • <2>. RDD隊列流
  • 使用streamingContext.queueStream(queueOfRDD)創建基於RDD隊列的DStream,用於調試Spark Streaming應用程序。
package test.RDDQueue


import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.Queue

/**
*
* @ClassName: RDDQueueStream
* @Description: RDD隊列流
* @Author: YBCarry
* @Date2019-05-16 10:48
* @Version: V1.0
*
**/
object RDDQueueStream {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))

//創建隊列 RDD[Int]
val rddQueue = new Queue[RDD[Int]]()

//向隊列裡添加數據 (創建數據源)
for (i <- 1 to 3) {

rddQueue += ssc.sparkContext.makeRDD(1 to 10)

//便於觀察
Thread.sleep(1000)
}

//從隊列中接收數據,創建DStream
val inputDStream = ssc.queueStream(rddQueue)

//處理數據
val result = inputDStream.map(x => (x, x * 2))
result.print()

ssc.start()
ssc.awaitTermination()

}

}
複製代碼
  • <3>. 套接字流:通過監聽Socket端口來接收數據

(3) 高級源

  • <1>. Spark Streaming接收Flume數據
  • a. 基於Flume的Push模式:
  • Flume被用於在Flume agents之間推送數據,在這種方式下,Spark Streaming可以很方便的建立一個receiver,起到一個Avro agent的作用。Flume可以將數據推送到改receiver。
  • 以下為配置步驟:
  • **第一步:**Flume的配置文件
  • MyFlumeStream01.conf
#定義agent名, source、channel、sink的名稱
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#具體定義source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /usr/local/tmp_files/logs

#具體定義channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#具體定義sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = bigdata01
a4.sinks.k1.port = 1234

#組裝source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
複製代碼
  • **第二步:**Spark Streaming程序
package test.Flume

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
*
* @ClassName: MyFlumeStream
* @Description: flume將數據推送給Spark Streaming 使用push
* @Author: YBCarry
* @Date2019-05-16 14:01
* @Version: V1.0
*
**/
object MyFlumeStream01 {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//創建一個Streaming Context對象
//local[2] 表示開啟了兩個線程
val conf = new SparkConf().setAppName("MyRDDQueueStream").setMaster("local[2]")
//Seconds(3) 表示採樣時間間隔
val ssc = new StreamingContext(conf, Seconds(3))

//對接Flume
//創建一個Flumeevent從flume中接收puch來的數據(也是DStream)
//flume將數據push到localhost:1234,Spark Stream在這裡監聽
val flumeEventDStream = FlumeUtils.createStream(ssc, "bigdata01", 1234)

//將Flumeevent中的事件轉換成字符串
val lineDStream = flumeEventDStream.map(e => {
new String(e.event.getBody.array)
})

//輸出結果
lineDStream.print()

ssc.start()
ssc.awaitTermination()

}
}
複製代碼
  • **第三步:**測試
  • 啟動Flume
  • flume-ng agent -n a4 -f Spark/MyFlumeStream01.conf -c conf -Dflume.root.logger=INFO,console
  • 啟動Spark Streaming程序
  • 拷貝日誌文件到/root/training/logs目錄
  • 觀察輸出,採集到數據
  • b. 基於Custom Sink的Pull模式
  • 不同於Flume直接將數據推送到Spark Streaming中,第二種模式通過以下條件運行一個正常的Flume sink。Flume將數據推送到sink中,並且數據保持buffered狀態。Spark Streaming使用一個可靠的Flume接收器和轉換器從sink拉取數據。只要當數據被接收並且被Spark Streaming備份後,轉換器才運行成功。
  • 這樣,與第一種模式相比,保證了很好的健壯性和容錯能力,這種模式需要為Flume配置一個正常的sink。
  • 以下為配置步驟:
  • **第一步:**Flume的配置文件
  • FlumeLogPull.conf
 a1.channels = c1
a1.sinks = k1
a1.sources = r1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/tmp_files/logs

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = bigdata01
a1.sinks.k1.port = 1234

#組裝source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

複製代碼
  • **第二步:**Spark Streaming程序
複製代碼
  • package test.Flume
  • import org.apache.spark.streaming.StreamingContext import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.storage.StorageLevel import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming.flume.FlumeUtils
  • /** *
  • @ClassName: FlumePutSink
  • @Description: 測試pull方式 使用Spark sink
  • @Author: YBCarry
  • @Date2019-05-16 15:23
  • @Version: V1.0
  • **/ object FlumeLogPull {
  • def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //創建一個Streaming Context對象 //local[2] 表示開啟了兩個線程 val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]") //Seconds(3) 表示採樣時間間隔 val ssc = new StreamingContext(conf,Seconds(3))
//創建FlumeEvent的DStream,採用pull的方式
val flumeEvent = FlumeUtils.createPollingStream(ssc, "172.16.194.128",1234, StorageLevel.MEMORY_ONLY)

//將FlumeEvent的事件準換成字符串
val lineDStream = flumeEvent.map( e => {
new String(e.event.getBody.array)
})

//輸出結果
lineDStream.print()

ssc.start()
ssc.awaitTermination()
}
複製代碼
  • }
複製代碼
  • **第三步:**需要的jar包
  • 將spark-streaming-flume-sink_2.11-2.1.0.jar拷貝到Flume的lib目錄下。
  • **第四步:**測試
  • 啟動Flume
  • 啟動Spark Streaming程序
  • 將測試數據拷貝到/root/training/logs
  • 觀察輸出

如果您喜歡這個系列的文章,請多多轉發評論收藏點贊,關注我後續還會有更精彩有趣的內容分享給你,如果有什麼想知道的也可以評論告訴我。私信我“學習”即可獲取詳盡的視頻教學資料。

"

相關推薦

推薦中...