軟件版本如下:
scala2.11.8
spark2.1.0
hbase1.2.0
公司有一些實時數據處理的項目,存儲用的是hbase,提供實時的檢索,當然hbase裡面存儲的數據模型都是簡單的,複雜的多維檢索的結果是在es裡面存儲的,公司也正在引入Kylin作為OLAP的數據分析引擎,這塊後續有空在研究下。
接著上面說的,hbase存儲著一些實時的數據,前兩週新需求需要對hbase裡面指定表的數據做一次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入一個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單線程的操作api,勢必速度回慢上許多。
關於批量操作Hbase,一般我們都會用MapReduce來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近一直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是一樣的,在spark裡面把從hbase裡面讀取的數據集轉成rdd了,然後做一些簡單的過濾,轉化,最終在把結果寫入到hbase裡面。
整個流程如下:
(1)全量讀取hbase表的數據
(2)做一系列的ETL
(3)把全量數據再寫回hbase
核心代碼如下:
//獲取conf val conf=HBaseConfiguration.create //設置讀取的表 conf.set(TableInputFormat.INPUT_TABLE,tableName) //設置寫入的表 conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) //創建sparkConf val sparkConf=new SparkConf //設置spark的任務名 sparkConf.setAppName("read and write for hbase ") //創建spark上下文 val sc=new SparkContext(sparkConf) //為job指定輸出格式和輸出表名 val newAPIJobConfiguration1 = Job.getInstance(conf) newAPIJobConfiguration1.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName) newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) //全量讀取hbase表 val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat] ,classOf[ImmutableBytesWritable] ,classOf[Result] ) //過濾空數據,然後對每一個記錄做更新,並轉換成寫入的格式 val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas) //轉換後的結果,再次做過濾 val save_rdd=final_rdd.filter(checkNull) //最終在寫回hbase表 save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration) sc.stop
從上面的代碼可以看出來,使用spark+scala操作hbase是非常簡單的。下面我們看一下,中間用到的幾個自定義函數:
第一個:checkNotEmptyKs
作用:過濾掉空列簇的數據
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={ val r=f._2 val rowkey=Bytes.toString(r.getRow) val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala if(map.isEmpty) false else true }
第二個:forDatas
作用:讀取每一條數據,做update後,在轉化成寫入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //獲取Result val put:Put=new Put(r.getRow) //聲明put val ks=Bytes.toBytes("ks") //讀取指定列簇 val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala map.foreach(kv=>{//遍歷每一個rowkey下面的指定列簇的每一列的數據做轉化 val kid= Bytes.toString(kv._1)//知識點id var value=Bytes.toString(kv._2)//知識點的value值 value="修改後的value" put.addColumn(ks,kv._1,Bytes.toBytes(value))//放入put對象 } ) if(put.isEmpty) null else (new ImmutableBytesWritable,put) }
第三個:checkNull
作用:過濾最終結果裡面的null數據
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={ if(f==null) false else true }
上面就是整個處理的邏輯了,需要注意的是對hbase裡面的無效數據作過濾,跳過無效數據即可,邏輯是比較簡單的,代碼量也比較少。
除了上面的方式,還有一些開源的框架,也封裝了相關的處理邏輯,使得spark操作hbase變得更簡潔,有興趣的朋友可以瞭解下,github鏈接如下: