基於 Spark 的數據分析實踐

基於 Spark 的數據分析實踐

轉載本文需註明出處:微信公眾號EAWorld,違者必究。

引言:

Spark是在借鑑了MapReduce之上發展而來的,繼承了其分佈式並行計算的優點並改進了MapReduce明顯的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件。

本文主要分析了 Spark RDD 以及 RDD 作為開發的不足之處,介紹了 SparkSQL 對已有的常見數據系統的操作方法,以及重點介紹了普元在眾多數據開發項目中總結的基於 SparkSQL Flow 開發框架。

目錄:

一、Spark RDD

二、基於Spark RDD數據開發的不足

三、SparkSQL

四、SparkSQL Flow

一、Spark RDD

RDD(Resilient Distributed Dataset)叫做彈性分佈式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、元素可並行計算的集合。

RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。

//Scala 在內存中使用列表創建

val lines = List(“A”, “B”, “C”, “D” …)
val rdd:RDD = sc.parallelize(lines);

//以文本文件創建

val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”)

Spark RDD Partition 分區劃分

基於 Spark 的數據分析實踐

新版本的 Hadoop 已經把 BlockSize 改為 128M,也就是說每個分區處理的數據量更大。

Spark 讀取文件分區的核心原理

本質上,Spark 是利用了 Hadoop 的底層對數據進行分區的 API(InputFormat):

public abstract class InputFormat<K,V>{
public abstract List<InputSplit> getSplits(JobContextcontext
) throwsIOException,InterruptedException;

public abstract RecordReader<K,V> createRecordReader(InputSplitsplit,
TaskAttemptContextcontext
)throwsIOException,InterruptedException;
}

Spark 任務提交後通過對輸入進行 Split,在 RDD 構造階段,只是判斷是否可 Split(如果參數異常一定在此階段報出異常),並且 Split 後每個 InputSplit 都是一個分區。只有在Action 算子提交後,才真正用 getSplits 返回的 InputSplit 通過 createRecordReader 獲得每個 Partition 的連接。

然後通過 RecordReader 的 next() 遍歷分區內的數據。

Spark RDD 轉換函數和提交函數

基於 Spark 的數據分析實踐

Spark RDD 的眾多函數可分為兩大類Transformation 與 Action。Transformation 與 Action 的區別在於,對 RDD 進行 Transformation 並不會觸發計算:Transformation 方法所產生的 RDD 對象只會記錄住該 RDD 所依賴的 RDD 以及計算產生該 RDD 的數據的方式;只有在用戶進行 Action 操作時,Spark 才會調度 RDD 計算任務,依次為各個 RDD 計算數據。這就是 Spark RDD 內函數的“懶加載”特性。

二、基於Spark RDD數據開發的不足

由於MapReduce的shuffle過程需寫磁盤,比較影響性能;而Spark利用RDD技術,計算在內存中流式進行。另外 MapReduce計算框架(API)比較侷限, 使用需要關注的參數眾多,而Spark則是中間結果自動推斷,通過對數據集上鍊式執行函數具備一定的靈活性。

即使 SparkRDD 相對於 MapReduce 提高很大的便利性,但在使用上仍然有許多問題。體現在一下幾個方面:

  1. RDD 函數眾多,開發者不容易掌握,部分函數使用不當 shuffle時造成數據傾斜影響性能;
  2. RDD 關注點仍然是Spark太底層的 API,基於 Spark RDD的開發是基於特定語言(Scala,Python,Java)的函數開發,無法以數據的視界來開發數據;
  3. 對 RDD 轉換算子函數內部分常量、變量、廣播變量使用不當,會造成不可控的異常;
  4. 對多種數據開發,需各自開發RDD的轉換,樣板代碼較多,無法有效重利用;
  5. 其它在運行期可能發生的異常。如:對象無法序列化等運行期才能發現的異常。

三、SparkSQL

Spark 從 1.3 版本開始原有 SchemaRDD 的基礎上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發者的學習門檻,同時還支持Scala、Java與Python三種語言。更重要的是,由於脫胎自SchemaRDD,DataFrame天然適用於分佈式大數據場景。

基於 Spark 的數據分析實踐

一般的數據處理步驟:讀入數據 -> 對數據進行處理 -> 分析結果 -> 寫入結果

SparkSQL 結構化數據

  • 處理結構化數據(如 CSV,JSON,Parquet 等);
  • 把已經結構化數據抽象成 DataFrame (HiveTable);
  • 非結構化數據通過 RDD.map.filter 轉換成結構化進行處理;
  • 按照列式數據庫,只加載非結構化中可結構化的部分列(Hbase,MongoDB);

處理非結構化數據,不能簡單的用 DataFrame 裝載。而是要用 SparkRDD 把數據讀入,在通過一系列的 Transformer Method 把非結構化的數據加工為結構化,或者過濾到不合法的數據。

SparkSQL DataFrame


基於 Spark 的數據分析實踐

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎的分佈式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫中的 DataFrame 結構,則會對 SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.apache.spark.sql._
//定義數據的列名稱和類型
valdt=StructType(List(id:String,name:String,gender:String,age:Int))

//導入user_info.csv文件並指定分隔符
vallines = sc.textFile("/path/user_info.csv").map(_.split(","))

//將表結構和數據關聯起來,把讀入的數據user.csv映射成行,構成數據集
valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))

//通過SparkSession.createDataFrame()創建表,並且數據表表頭
val df= spark.createDataFrame(rowRDD, dt)

讀取規則數據文件作為DataFrame

SparkSession.Builder builder = SparkSession.builder()
Builder.setMaster("local").setAppName("TestSparkSQLApp")
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

# 讀取 JSON 數據,path 可為文件或者目錄
valdf=sqlContext.read().json(path);

# 讀取 HadoopParquet 文件
vardf=sqlContext.read().parquet(path);

# 讀取 HadoopORC 文件
vardf=sqlContext.read().orc(path);

JSON 文件為每行一個 JSON 對象的文件類型,行尾無須逗號。文件頭也無須[]指定為數組;SparkSQL 讀取是隻是按照每行一條 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();
ParquetFileReaderreader = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));
Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();
String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata");

allFiedls 的值就是各字段的名稱和具體的類型,整體是一個json格式進行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創建 SparkContext。 Builder.getOrCreate() 用於創建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個核心組件SQLcontext和HiveContext。SQLContext 用於處理在 SparkSQL 中動態註冊的表,HiveContext 用於處理 Hive 中的表。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執行 Hive 中的表,也可執行內部註冊的表;

在需要執行 Hive 表時,只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();
SparkSession spark = builder.getOrCreate();
SQLContext sqlContext = spark.sqlContext();

// db 指 Hive 庫中的數據庫名,如果不寫默認為 default

// tableName 指 hive 庫的數據表名

sqlContext.sql(“select * from db.tableName”)

SparkSQL ThriftServer

//首先打開 Hive 的 Metastore服務

hive$bin/hive –-service metastore –p 8093

//把 Spark 的相關 jar 上傳到hadoophdfs指定目錄,用於指定sparkonyarn的依賴 jar

spark$hadoop fs –put jars/*.jar /lib/spark2

// 啟動 spark thriftserver 服務

spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf 
spark.yarn.jars=hdfs:///lib/spark2/*.jar

當hdfs 上傳了spark 依賴 jar 時,通過spark.yarn.jars 可看到日誌 spark 無須每個job 都上傳jar,可節省啟動時間

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

//通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop

-u 是指定 beeline 的執行驅動地址;

-n 是指定登陸到 spark Session 上的用戶名稱;

Beeline 還支持傳入-e 可傳入一行 SQL,

-e <query> query that should be executed

也可通過 –f 指定一個 SQL File,內部可用逗號分隔的多個 SQL(存儲過程)

-f <exec file> script file that should be executed

SparkSQL Beeline 的執行效果展示

基於 Spark 的數據分析實踐

SparkSQL ThriftServer

基於 Spark 的數據分析實踐

對於 SparkSQL ThriftServer 服務,每個登陸的用戶都有創建的 SparkSession,並且執行的對個 SQL 會通過時間順序列表展示。

SparkSQL ThriftServer 服務可用於其他支持的數據庫工具創建查詢,也用於第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎,開發的統一的基於 XML 配置化的可執行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個 Flow。下文開始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基於 SparkSQL 開發的一種基於 XML 配置化的 SQL 數據流轉處理模型。該模型簡化了 SparkSQL 、Spark RDD的開發,並且降低開發了難度,適合瞭解數據業務但無法駕馭大數據以及 Spark 技術的開發者。

  • 一個由普元技術部提供的基於 SparkSQL 的開發模型;
  • 一個可二次定製開發的大數據開發框架,提供了靈活的可擴展 API;
  • 一個提供了 對文件,數據庫,NoSQL 等統一的數據開發視界語義;
  • 基於 SQL 的開發語言和 XML 的模板配置,支持 Spark UDF 的擴展管理;
  • 支持基於 Spark Standlone,Yarn,Mesos 資源管理平臺;
  • 支持開源、華為、星環等平臺統一認證。

SparkSQL Flow 適合的場景:

  1. 批量 ETL;
  2. 非實時分析服務;

SparkSQL Flow XML 概覽

基於 Spark 的數據分析實踐

  1. Properties 內定義一組變量,可用於宏替換;
  2. Methods 內可註冊 udf 和 udaf 兩種函數;
  3. Prepare 內可定義前置 SQL,用於執行 source 前的 sql 操作;
  4. Sources 內定義一個到多個數據表視圖;
  5. Transformer 內可定義 0 到多個基於 SQL 的數據轉換操作(支持 join);
  6. Targets 用於定義 1 到多個數據輸出;
  7. After 可定義 0到多個任務日誌;

如你所見,source 的 type 參數用於區分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數據源加載廣度;並且,根據 type 不同,source 也需要配置不同的參數,如數據庫還需要 driver,url,user和 password 參數。

Transformer 是基於 source 定的數據視圖可執行的一組轉換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執行結果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

SparkSQL Flow 支持的Sourse

基於 Spark 的數據分析實踐

  • 支持從 Hive 獲得數據;
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
  • 支持RDBMS數據庫:PostgreSQL, MySQL,Oracle
  • 支持 NOSQL 數據庫:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進行切分,切分不夠的列使用 null 填充。

<source type="textfile" table_name="et_rel_pty_cong"
fields="cust_id,name1,gender1,age1:int"
delimiter=","
path="file:///Users/zhenqin/software/hive/user.txt"/>
  1. Tablename 為該文件映射的數據表名,可理解為數據的視圖;
  2. Fields 為切分後的字段,使用逗號分隔,字段後可緊跟該字段的類型,使用冒號分隔;
  3. Delimiter 為每行的分隔符;
  4. Path 用於指定文件地址,可以是文件,也可是文件夾;
  5. Path 指定地址需要使用協議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關;

SparkSQL Flow DB Source

<source type="mysql" table_name="et_rel_pty_cong"
table="user"
url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8"
driver="com.mysql.jdbc.Driver"
user="root" password="123456"/>

RDBMS 是從數據庫使用 JDBC讀取 數據集。支持 type 為:db、mysql、oracle、postgres、mssql;

  1. tablename 為該數據表的抽象 table 名稱(視圖);
  2. url、driver、user,password 為數據庫 JDBC 驅動信息,為必須字段;
  3. SparkSQL 會加載該表的全表數據,無法使用 where 條件。

SparkSQL Flow Transformer

<transform type="sql" table_name="cust_id_agmt_id_t" cached="true">
SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids
FROM user_concat_testx
group by c_phone,c_type,c_num
</transform>

Transform 支持 cached 屬性,默認為 false;如果設置為 true,相當於把該結果緩存到內存中,緩存到內存中的數據在後續其它 Transform 中使用能提高計算效率。但是需使用大量內存,開發者需要評估該數據集能否放到內存中,防止出現 OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數據到一個或者多個目標。這些目標,基本覆蓋了 Source 包含的外部系統。下面以 Hive 舉例說明:

<target type="hive"
table_name="cust_id_agmt_id_t"
savemode=”append”
target_table_name="cust_id_agmt_id_h"/>
  1. table_name 為 source 或者 Transform 定義的表名稱;
  2. target_table_name 為 hive 中的表結果,Hive 表可不存在也可存在,sparksql 會根據 DataFrame 的數據類型自動創建表;
  3. savemode 默認為 overwrite 覆蓋寫入,當寫入目標已存在時刪除源表再寫入;支持 append 模式, 可增量寫入。

Target 有一個特殊的 show 類型的 target。用於直接在控制檯輸出一個 DataFrame 的結果到控制檯(print),該 target 用於開發和測試。

<target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/>

Rows 用於控制輸出多少行數據。

SparkSQL Around

After 用於 Flow 在運行結束後執行的一個環繞,用於記錄日誌和寫入狀態。類似 Java 的 try {} finally{ round.execute() }

多個 round 一定會執行,round 異常不會導致任務失敗。

<prepare>
<round type="mysql"
sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at)
values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())"
url="${jdbc.url}" .../>
</prepare>
<after>
<round type="mysql"
sql="update cpic_task_history set
end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}"
url="${jdbc.url}”…/>
</after>

Prepare round 和 after round 配合使用可用於記錄 SparkSQL Flow 任務的運行日誌。

SparkSQL Around可使用的變量

基於 Spark 的數據分析實踐

SparkSQL Around的執行效果

基於 Spark 的數據分析實踐

Prepare round 可做插入(insert)動作,after round 可做更新 (update)動作,相當於在數據庫表中從執行開始到結束有了完整的日誌記錄。SparkSQL Flow 會保證round 一定能被執行,而且 round 的執行不影響任務的狀態。

SparkSQL Flow 提交

bin/spark-submit --master yarn-client --driver-memory 1G \
--num-executors 10 --executor-memory 2G \
--jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar \
--conf spark.yarn.jars=hdfs:///lib/spark2/*.jar \
--queue default --name FlowTest \
etl-flow-0.2.0.jar -f hive-flow-test.xml
基於 Spark 的數據分析實踐

接收必須的參數 –f,可選的參數為支持 Kerberos 認證的租戶名稱principal,和其認證需要的密鑰文件。

usage: spark-submit --jars etl-flow.jar --class
com.yiidata.etl.flow.source.FlowRunner
-f,--xml-file <arg> Flow XML File Path
--keytabFile <arg> keytab File Path(Huawei)
--krb5File <arg> krb5 File Path(Huawei)
--principal <arg> principal for hadoop(Huawei)

SparkSQL Execution Plan

基於 Spark 的數據分析實踐

每個Spark Flow 任務本質上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 裡可以看到 flow 中重要的數據表操作。

regiserDataFrameAsTable 是每個 source 和 Transform 的數據在 SparkSQL 中的數據視圖,每個視圖都會在 SparkContex 中註冊一次。

對RegisterDataFrameAsTable的分析

基於 Spark 的數據分析實踐

通過單個 regiserDataFrameAsTable 項進行分析,SparkSQL 並不是把source 的數據立即計算把數據放到內存,而是每次執行 source 時只是生成了一個 Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才會觸發前面所依賴的的 plan 執行。

總結

這是一個開發框架,不是一個成熟的產品,也不是一種架構。他只是基於 SparkSQL 整合了大多數的外部系統,能通過 XML 的模板配置完成數據開發。面向的是理解數據業務但不瞭解 Spark 的數據開發人員。整個框架完成了大多數的外部系統對接,開發者只需要使用 type 獲得數據,完成數據開發後通過 target 回寫到目標系統中。整個過程基本無須程序開發,除非當前的 SQL 函數無法滿足使用的情況下,需要自行開發一下特定的 UDF。因此本框架在對 SparkSQL 做了二次開發基礎上,大大簡化了 Spark 的開發,可降低了開發者使用難度。


關於作者:震秦,普元資深開發工程師,專注於大數據開發 8 年,擅長 Hadoop 生態內各工具的使用和優化。參與某公關廣告(上市)公司DMP 建設,負責數據分層設計和批處理,調度實現,完成交付使用;參與國內多省市公安社交網絡項目部署,負責產品開發(Spark 分析應用);參與數據清洗加工為我方主題庫並部署上層應用。

關於EAWorld:微服務,DevOps,數據治理,移動架構原創技術分享。

相關推薦

推薦中...