'Linux環境Spark安裝配置及使用(三)'

Spark Linux Scala 全能架構師 2019-08-06
"
"
Linux環境Spark安裝配置及使用(三)

  • 7. Spark RDD的高級算子

(1) mapPartitionsWithIndex

  • 把每個partition中的分區號和對應的值拿出來
  • def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
  • f中函數參數:
  • 第一個參數是Int,代表分區號
  • 第二個Iterator[T]代表分區中的元素
  • e.g.: 將每個分區中的元素和分區號打印出來
  • val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
  • 創建一個函數返回RDD中的每個分區號和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
複製代碼
  • 調用:rdd1.mapPartitionsWithIndex(func1).collect

(2) aggregate

  • 先對局部聚合,再對全局聚合
  • e.g.: val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
  • 查看每個分區中的元素:
scala> rdd1.mapPartitionsWithIndex(fun1).collect
\tres4: Array[String] = Array(
\t[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
\t[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
複製代碼
  • 將每個分區中的最大值求和,注意初始值是0:
scala> rdd2.aggregate(0)(max(_,_),_+_)
\tres6: Int = 7
複製代碼
  • 如果初始值時候100,則結果為300:
scala> rdd2.aggregate(100)(max(_,_),_+_)
\tres8: Int = 300
```
複製代碼
  • 如果是求和,注意初始值是0:
scala> rdd2.aggregate(0)(_+_,_+_)
\tres9: Int = 15
複製代碼
  • 如果初始值是10,則結果是45
scala> rdd2.aggregate(10)(_+_,_+_)
\tres10: Int = 45
複製代碼
  • e.g. —— 字符串:
  • val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
  • 修改一下剛才的查看分區元素的函數
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
  • 兩個分區中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
複製代碼
  • 運行結果:
  • e.g.:
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
複製代碼
  • 結果可能是24,也可能是42
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
  • 結果是10,也可能是01
  • 原因:注意有個初始值"",其長度0,然後0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
  • 結果是11,原因同上。

(3) aggregateByKey

  • 準備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
  • 兩個分區中的元素:
  • e.g.:
  • 將每個分區中的動物最多的個數求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
複製代碼
  • 將每種動物個數求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼
  • 這個例子也可以使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼

(4) coalesce與repartition

  • 都是將RDD中的分區進行重分區。
  • 區別:
  • coalesce默認不會進行shuffle(false);
  • repartition會進行shuffle(true),會將數據真正通過網絡進行重分區。
  • e.g.:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) -> 如果是false,查看RDD的length依然是2
複製代碼

(5) 其他高級算子

  • 參考:homepage.cs.latrobe.edu.au/zhe/ZhenHeS…

8. Spark 基礎編程案例

(1) 求網站的訪問量

  • Tomcat的訪問日誌如下:
  • 需求:找到訪問量最高的兩個網頁,要求顯示網頁名稱和訪問量
  • 步驟分析:
  • <1>. 對網頁的訪問量求和
  • <2>. 降序排序
  • 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TomcatLogCount {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("TomcatLogCount")
val sc = new SparkContext(conf)

/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */

val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,得到jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)

val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)

//得到jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))

(jspName, 1)
}
)
//統計每個jsp的次數
val rdd2 = rdd1.reduceByKey(_+_)

//使用Value排序
val rdd3 = rdd2.sortBy(_._2, false)

//得到次數最多的兩個jsp
rdd3.take(2).foreach(println)

sc.stop()
}
}
複製代碼

(2) 創建自定義分區

  • 根據jsp文件的名字,將各自的訪問日誌放入到不同的分區文件中,如下:
  • 生成的分區文件
  • 如:part-00000文件中的內容:只包含了web.jsp的訪問日誌
  • 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap


object TomcatLogPartitioner {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local").setAppName("TomcatLogPartitioner")
val sc = new SparkContext(conf)

/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */

val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,得到jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)

val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)

//得到jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))

(jspName, line)
}
)

//得到不重複的jsp名字
val rdd2 = rdd1.map(_._1).distinct().collect()

//創建分區規則
val wepPartitioner = new WepPartitioner(rdd2)
val rdd3 = rdd1.partitionBy(wepPartitioner)

//輸出rdd3
rdd3.saveAsTextFile(" ")

}

//定義分區規則
class WepPartitioner(jspList : Array[String]) extends Partitioner {

/*
* 定義集合來保存分區條件:
* String 代表jsp的名字
* Int 代表序號
* */

val partitionMap = new HashMap[String, Int]()
//初始分區號
val partID = 0
//填值
for (jsp <- jspList) {
patitionMap.put(jsp, partID)
partID += 1
}

//返回分區個數
def numPartitioners : Int = partitionMap.size

//根據jsp,返回對應的分區
def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(), 0)

}

}
複製代碼

(3) 使用JDBCRDD 訪問數據庫

  • JdbcRDD參數說明:
  • 從上面的參數說明可以看出,JdbcRDD有以下兩個缺點:
  • <1>. 執行的SQL必須有兩個參數,並類型都是Long
  • <2>. 得到的結果是ResultSet,即:只支持select操作
  • 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

/*
* 把Spark結果存放到mysql數據庫中
*
*/

object TomcatLogCountToMysql {
def main(args: Array[String]): Unit = {
//創建SparkContext
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")

val sc = new SparkContext(conf)

/*
*
* 讀入日誌 解析:
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*/

val rdd1 = sc.textFile("H:\\\\tmp_files\\\\localhost_access_log.txt")
.map(
line => {
//解析字符串,得到jsp的名字
//1、解析兩個引號之間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/oracle.jsp HTTP/1.1

//得到兩個空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/oracle.jsp

//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")) // oracle.jsp

(jspName, 1)
})

//
// try {
// /*
// * create table mydata(jsname varchar(50),countNumber Int)
// *
// * foreach 沒有返回值,在本需求中,只需要寫數據庫,不需要返回新的RDD,所以用foreach即可
// *
// *
// * 運行 Task not serializable
// */
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// rdd1.foreach(f => {
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()
// //存入數據庫
// var conn: Connection = null
// var pst: PreparedStatement = null

// //第一種修改方法
// /*
// * 修改思路:
// * conn pst 讓每一個節點都是用到,需要在不同的節點上傳輸,實現sericalizable接口
// */
// try {
// rdd1.foreach(f => {
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()

/*
* 第一種修改方式,功能上可以實現,但每條數據都會創建連接,對數據庫造成很大壓力
*
* 針對分區來操作:一個分區,建立一個連接即可
*/
rdd1.foreachPartition(saveToMysql)
sc.stop()

}

def saveToMysql(it: Iterator[(String, Int)]) = {
var conn: Connection = null
var pst: PreparedStatement = null

try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
pst = conn.prepareStatement("insert into mydata values(?,?)")

it.foreach(f => {

pst.setString(1, f._1)
pst.setInt(2, f._2)

pst.executeUpdate()
})
} catch {
case t: Throwable => t.printStackTrace()
} finally {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}

}

如果您喜歡這個系列的文章請多多點贊轉發評論收藏。如果有什麼想知道想了解的,也可以評論告訴我。

"

相關推薦

推薦中...