Storm,Spark和Hadoop MapReduce之間有什麼關係呢?
接下來是咱們的Spark的詳細筆記:
高階函數:
如果一個函數的參數類型為一個函數,那麼此函數稱為高階函數(high-level)
如何確定一個函數:
-1,參數
個數、類型、順序
-2,返回值
def map[B](f: (A) => B): List[B]
f: (A) => B
表示的是函數
f:
表示的函數的名稱
(A):
函數的參數類型
B:
函數的返回值類型
val list = List(1, 2, 3, 4)
val list2 = list.map(_ * _)// 裝X模式
list.map(num => {
num * num
})
面向對象
-1,構造方法
主構造方法
class Person()
附屬構造方法
def this()
必須最終調用主構造方法
-2,object
單例
所有的變量和方法都是靜態的static
main
-3,伴生對象伴生類
如果一個類,還對應一個object
apply
創建class時更加簡單
-4,Case Class - JavaBean
case class People(name: String, age: Int)
隱式轉換
偷龍轉鳳
將某個類的對象轉換為另外一個類的對象。
核心:
函數
隱式轉換函數 implicit
隱式參數
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
大數據改變世界,Spark改變大數據
-1,大數據
以HADOOP 2.x為主的分佈式計算框架,處理海量數據,並行的進行計算
MapReduce - Shuffle過程中中間結果數據的處理
分而治之
map
分 -> 磁盤(中間分析結果,中間緩存數據)
reduce
合
-2,Spark
大數據起源於搜索,發展於電商。
Spark
類比MapReduce框架,都是處理數據,分析數據
並行計算框架
編程思想:
Step 1:
讀取要處理要分析的數據 -> 內存
封裝
集合(RDD : Resilient Distributed Dataset) 數據結構
分區的 - hdfs block
每個分區進行數據處理,函數- RDD
Step 2:
分析處理數據
RDD#transformation
Step 3:
將分析數據分結果進行存儲
RDD#action
RDD
函數(方法)
-1,處理數據 函數
Transformation 轉換
-2,存儲或者返回
Action
count\take(5)\saveAsTextFile("")\foreach(item => { })
Spark
實時內存框架
批處理(batch procssing)交互式處理流處理
MapReduce\HiveImpala\Kylin Storm\JStorm
SparkCoreSparkSQLSparkStreaming
RDD SQL\DSL
Spark 核心編程
Spark 高級分析
Spark MLlib
Spark GraphX
Spark 發展:
加州大學 伯克利 分校 AMPLab 實驗室
AMP(機器學習,人工智能)
A:
算法
M:
機器
P:
人
Databrick
金磚
Apache Spark is
a fast and general engine for large-scale data processing.
-1,處理海量數據
並行計算框架
-2,fast
對比 MapReduce
要數據的數據,如果放在內存中的話,要快100X
如果放在硬盤上的話,快10倍
-3,general
Runs Everywhere
Spark runs on Hadoop, Mesos, standalone, or in the cloud.
-1,Spark 開發的程序
運行在哪裡?
-i,本地測試
-ii,集群模式
YARN
“數據操作系統Data Operate System”
“雲操作系統”
Mesos
類比於YARN
Standalone
Spark 框架自身一個資源調度管理框架
類比於YARN
It can access diverse data sources including HDFS, Cassandra, HBase, and S3.
-2,處理數據的來源
HDFS、HBase、。。。。
xml jdbc(mysql,oracle,db2),JSON,parquet,ES
Flume\Kafka
第一個數據
阿里巴巴 每日使用Spark處理的數據量 1PB(1024TB)
第二個數據
騰訊目前全球範圍內Spark 集群機器數目最多 2000+(8000+)
Spark 核心編程
三個框架全部將我們要處理的數據封裝成”集合“
SparkCore
RDD
SparkContext
val rdd = sc.textFile("/user/beifeng/mapreduce/wordcount/input")
讀數據:HDFS
SparkSQL
DataFrame(1.3.x)/Dataset(2.x)
讀數據:Hive
SparkStreaming
DStream
讀取數據:Kafka
Spark 學習資料
-1,官方文檔
http://spark.apache.org/docs/1.6.1/
-2,Spark源碼
https://github.com/apache/spark
-i,關聯IDE
IDEA
-ii,Import IDEA
Maven 倉庫
-3,官方博客
https://databricks.com/blog
-4,英文書籍
中文書籍
注意一點:
Spark 來說,下載源碼,針對對應的HADOOP版本以及其他框架版本進行編譯
HADOOP版本
APACHE
CDH
在實際的企業中如何做的呢?
CM/CDH
CM方式通過parcels方式安裝CDH組件
依據Spark源碼,針對對應CDH的HADOOP、HIVE版本進行編譯。
Google後Hadoop時代的新“三駕馬車”——Caffeine、Pregel、Dremel
Dremel
Impala
SparkSQL
Drill
管理項目工程的JAR包
JAVA
Maven
SCALA
SBT
針對APACHE HADOOP 編譯
./make-distribution.sh --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0 -Phive -Phive-thriftserver
針對CDH版本的 HADOOP 編譯
./make-distribution.sh --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.6 -Phive -Phive-thriftserver
Maven 鏡像源 - 開源中國 - 僅僅針對APACHE版本
<mirror>
<id>nexus-osc</id>
<mirrorOf>*</mirrorOf>
<name>Nexus osc</name>
<url>http://maven.oschina.net/content/groups/public/</url>
</mirror>
Spark 開發
spark-shell
交互式命令行
local mode
基本配置:
-1, JDK
JAVA_HOME
-2, SCALA
SCALA_HOME
-3, 配置信息
啟動spark-shell
bin/spark-shell
日誌信息:
16/11/30 05:42:19 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
16/11/30 05:42:18 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/11/30 05:42:18 INFO server.AbstractConnector: Started [email protected]:4040
16/11/30 05:42:18 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/11/30 05:42:18 INFO ui.SparkUI: Started SparkUI at http://192.168.217.191:4040
啟動命令的時候,也啟動一個WEB UI監控頁面
scala> val rdd = sc.textFile("README.md")
rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at <console>:27
底層的原理:
如何讀取HDFS數據呢???
按照MapReduce讀取文件的方式進行的
默認情況下MapReduce讀取HDFS
一行一行的讀取,將每一行的數據變為Key,Value對的形式
Key:
每行數據的所在文件的偏移量
Value:
每一行的值
scala> rdd.count
scala> rdd.first
scala> rdd.take(1)
scala> val filterRdd = rdd.filter(line => line.contains("Spark"))
val pageViewsRdd=sc.textFile("user/hive/warehouse/db_hive.db/page_views/page_views.data")
pageViewsRdd.map(line=>(line.split("\t")(2),1)).reduceBykey(_+_).map(tuple.2,tuple._1).sortByKey(flase).take(10).map(tuple._2,tuple._1)
val list = List(1,2,3,4,5)
list.reduce(_ + _)
=
list.reduce((num01: Int, num02: Int) => num01 + num02)
num01num02
第一次1 2= 3
第二次3 3= 6
第三次6 4= 10
數據分析統計:
-1,清洗過濾
-2,分組word
-3,統計count
-4,排序top
-5,Top
在Hive中模式,本地模式:
MapReduce是否運行在本地,而不是YARN上。
hive.exec.mode.local.auto=false
什麼情況下,會啟用呢?會自動的轉換為本地模式呢?
當要處理的Hive表的數據小於一個hdfs block的大小的時候。
Spark 應用運行
bin/spark-submit
-1,本地模式
local
-2,集群模式
yarn
standalone
mesos
[beifeng@hadoop-senior01 spark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell --help
Usage: ./bin/spark-shell [options]
Options:
--master MASTER_URL
spark://host:port, mesos://host:port, yarn, or local.
bin/spark-shell --master local[2]
在Spark中每個Task的運行僅僅以Thread方式運行,每個Task運行需要1 CORE CPU。
Spark Standalone
屬於Spark自身自帶的分佈式資源管理和任務調度框架(Spark Application)
主節點YARN
Master ResourceManager
從節點
WorksNodeManagers
-1,內存
Memory
-2,CPU CORES
JVM PROCESS
配置:
${SPARK_HOME}/conf/spark-env.sh
SPARK_MASTER_IP=hadoop-senior01.ibeifeng.com
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_WORKER_INSTANCES=1
${SPARK_HOME}/conf/slaves
hadoop-senior01.ibeifeng.com
啟動:
配置主節點的機器到集群中的各個機器的SSH 無密鑰登錄,一定要配置
主節點:
sbin/start-master.sh
從節點:
sbin/start-slaves.sh
測試:
運行spark-shell在Standalone集群環境下
bin/spark-shell --master spark://hadoop-senior01.ibeifeng.com:7077
Spark Application 運行在集群(YARN、Standalone、Mesos)
-1,Driver Program
理解為運行在YARN上的每個應用的AM
負責任務運行時,資源的申請和任務的調度
就是運行main的方法
SparkContext
-i,讀取數據,創建RDD
-ii,調度任務
-2,Executors
運行在Work上的進程JVM,運行Task(Thread)
在每個Executor中會運行多個Task任務,每個Task都是以Thread方式運行。
面試:
一個Executor中並行運行多少個Task呢?由誰決定的呢?
作業:
在IDEA中,基於Spark Application模塊編程WordCount程序並測試。
想諮詢更多內容,可加群(Q):131322610