"
""
- 7. Spark RDD的高級算子
(1) mapPartitionsWithIndex
- 把每個partition中的分區號和對應的值拿出來
- def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
- f中函數參數:
- 第一個參數是Int,代表分區號
- 第二個Iterator[T]代表分區中的元素
- e.g.: 將每個分區中的元素和分區號打印出來
- val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
- 創建一個函數返回RDD中的每個分區號和元素:
def func1(index:Int, iter:Iterator[Int]):Iterator[String] ={
iter.toList.map( x => "[PartID:" + index + ", value=" + x + "]" ).iterator
}
複製代碼
- 調用:rdd1.mapPartitionsWithIndex(func1).collect
(2) aggregate
- 先對局部聚合,再對全局聚合
- e.g.: val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
- 查看每個分區中的元素:
scala> rdd1.mapPartitionsWithIndex(fun1).collect
\tres4: Array[String] = Array(
\t[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
\t[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
複製代碼
- 將每個分區中的最大值求和,注意初始值是0:
scala> rdd2.aggregate(0)(max(_,_),_+_)
\tres6: Int = 7
複製代碼
- 如果初始值時候100,則結果為300:
scala> rdd2.aggregate(100)(max(_,_),_+_)
\tres8: Int = 300
```
複製代碼
- 如果是求和,注意初始值是0:
scala> rdd2.aggregate(0)(_+_,_+_)
\tres9: Int = 15
複製代碼
- 如果初始值是10,則結果是45
scala> rdd2.aggregate(10)(_+_,_+_)
\tres10: Int = 45
複製代碼
- e.g. —— 字符串:
- val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
- 修改一下剛才的查看分區元素的函數
def func2(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
- 兩個分區中的元素:
[partID:0, val: a], [partID:0, val: b], [partID:0, val: c],
[partID:1, val: d], [partID:1, val: e], [partID:1, val: f]
複製代碼
- 運行結果:
- e.g.:
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
複製代碼
- 結果可能是24,也可能是42
val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
- 結果是10,也可能是01
- 原因:注意有個初始值"",其長度0,然後0.toString變成字符串
val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
複製代碼
- 結果是11,原因同上。
(3) aggregateByKey
- 準備數據:
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
複製代碼
- 兩個分區中的元素:
- e.g.:
- 將每個分區中的動物最多的個數求和
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res69: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
複製代碼
- 將每種動物個數求和
scala> pairRDD.aggregateByKey(0)(_+_, _ + _).collect
res71: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼
- 這個例子也可以使用:reduceByKey
scala> pairRDD.reduceByKey(_+_).collect
res73: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
複製代碼
(4) coalesce與repartition
- 都是將RDD中的分區進行重分區。
- 區別:
- coalesce默認不會進行shuffle(false);
- repartition會進行shuffle(true),會將數據真正通過網絡進行重分區。
- e.g.:
def func4(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
下面兩句話是等價的:
val rdd2 = rdd1.repartition(3)
val rdd3 = rdd1.coalesce(3,true) -> 如果是false,查看RDD的length依然是2
複製代碼
(5) 其他高級算子
- 參考:homepage.cs.latrobe.edu.au/zhe/ZhenHeS…
8. Spark 基礎編程案例
(1) 求網站的訪問量
- Tomcat的訪問日誌如下:
- 需求:找到訪問量最高的兩個網頁,要求顯示網頁名稱和訪問量
- 步驟分析:
- <1>. 對網頁的訪問量求和
- <2>. 降序排序
- 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TomcatLogCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TomcatLogCount")
val sc = new SparkContext(conf)
/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */
val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,得到jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)
//得到jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))
(jspName, 1)
}
)
//統計每個jsp的次數
val rdd2 = rdd1.reduceByKey(_+_)
//使用Value排序
val rdd3 = rdd2.sortBy(_._2, false)
//得到次數最多的兩個jsp
rdd3.take(2).foreach(println)
sc.stop()
}
}
複製代碼
(2) 創建自定義分區
- 根據jsp文件的名字,將各自的訪問日誌放入到不同的分區文件中,如下:
- 生成的分區文件
- 如:part-00000文件中的內容:只包含了web.jsp的訪問日誌
- 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap
object TomcatLogPartitioner {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("TomcatLogPartitioner")
val sc = new SparkContext(conf)
/*
* 讀入日誌並解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
* */
val rdd1 = sc.textFile(" ").map(
line => {
//解析字符串,得到jsp的名字
//1. 解析兩個引號間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
//line1 = GET /MyDemoWeb/oracle.jsp HTTP/1.1
val line1 = line.substring(index1 + 1, index2)
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
//line2 = /MyDemoWeb/oracle.jsp
val line2 = line1.substring(index3 + 1, index4)
//得到jsp的名字 oracle.jsp
val jspName = line2.substring(line2.lastIndexOf("/"))
(jspName, line)
}
)
//得到不重複的jsp名字
val rdd2 = rdd1.map(_._1).distinct().collect()
//創建分區規則
val wepPartitioner = new WepPartitioner(rdd2)
val rdd3 = rdd1.partitionBy(wepPartitioner)
//輸出rdd3
rdd3.saveAsTextFile(" ")
}
//定義分區規則
class WepPartitioner(jspList : Array[String]) extends Partitioner {
/*
* 定義集合來保存分區條件:
* String 代表jsp的名字
* Int 代表序號
* */
val partitionMap = new HashMap[String, Int]()
//初始分區號
val partID = 0
//填值
for (jsp <- jspList) {
patitionMap.put(jsp, partID)
partID += 1
}
//返回分區個數
def numPartitioners : Int = partitionMap.size
//根據jsp,返回對應的分區
def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(), 0)
}
}
複製代碼
(3) 使用JDBCRDD 訪問數據庫
- JdbcRDD參數說明:
- 從上面的參數說明可以看出,JdbcRDD有以下兩個缺點:
- <1>. 執行的SQL必須有兩個參數,並類型都是Long
- <2>. 得到的結果是ResultSet,即:只支持select操作
- 代碼:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
/*
* 把Spark結果存放到mysql數據庫中
*
*/
object TomcatLogCountToMysql {
def main(args: Array[String]): Unit = {
//創建SparkContext
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
val sc = new SparkContext(conf)
/*
*
* 讀入日誌 解析:
*
* 192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*/
val rdd1 = sc.textFile("H:\\\\tmp_files\\\\localhost_access_log.txt")
.map(
line => {
//解析字符串,得到jsp的名字
//1、解析兩個引號之間的字符串
val index1 = line.indexOf("\\"")
val index2 = line.lastIndexOf("\\"")
val line1 = line.substring(index1 + 1, index2) // GET /MyDemoWeb/oracle.jsp HTTP/1.1
//得到兩個空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3 + 1, index4) // /MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/")) // oracle.jsp
(jspName, 1)
})
//
// try {
// /*
// * create table mydata(jsname varchar(50),countNumber Int)
// *
// * foreach 沒有返回值,在本需求中,只需要寫數據庫,不需要返回新的RDD,所以用foreach即可
// *
// *
// * 運行 Task not serializable
// */
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// rdd1.foreach(f => {
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()
// //存入數據庫
// var conn: Connection = null
// var pst: PreparedStatement = null
// //第一種修改方法
// /*
// * 修改思路:
// * conn pst 讓每一個節點都是用到,需要在不同的節點上傳輸,實現sericalizable接口
// */
// try {
// rdd1.foreach(f => {
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
// pst = conn.prepareStatement("insert into mydata values(?,?)")
//
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// } catch {
// case t: Throwable => t.printStackTrace()
// } finally {
// if (pst != null) pst.close()
// if (conn != null) conn.close()
// }
//
// sc.stop()
/*
* 第一種修改方式,功能上可以實現,但每條數據都會創建連接,對數據庫造成很大壓力
*
* 針對分區來操作:一個分區,建立一個連接即可
*/
rdd1.foreachPartition(saveToMysql)
sc.stop()
}
def saveToMysql(it: Iterator[(String, Int)]) = {
var conn: Connection = null
var pst: PreparedStatement = null
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "root", "123456")
pst = conn.prepareStatement("insert into mydata values(?,?)")
it.foreach(f => {
pst.setString(1, f._1)
pst.setInt(2, f._2)
pst.executeUpdate()
})
} catch {
case t: Throwable => t.printStackTrace()
} finally {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}
}
如果您喜歡這個系列的文章請多多點贊轉發評論收藏。如果有什麼想知道想了解的,也可以評論告訴我。
相關推薦
推薦中...