如何使用scala+spark讀寫hbase?

編程語言 HBase Scala Spark 科技優家 2017-06-14

軟件版本如下:

scala2.11.8

spark2.1.0

hbase1.2.0

公司有一些實時數據處理的項目,存儲用的是hbase,提供實時的檢索,當然hbase裡面存儲的數據模型都是簡單的,複雜的多維檢索的結果是在es裡面存儲的,公司也正在引入Kylin作為OLAP的數據分析引擎,這塊後續有空在研究下。

接著上面說的,hbase存儲著一些實時的數據,前兩週新需求需要對hbase裡面指定表的數據做一次全量的update以滿足業務的發展,平時操作hbase都是單條的curd,或者插入一個批量的list,用的都是hbase的java api比較簡單,但這次涉及全量update,所以如果再用原來那種單線程的操作api,勢必速度回慢上許多。

關於批量操作Hbase,一般我們都會用MapReduce來操作,這樣可以大大加快處理效率,原來也寫過MR操作Hbase,過程比較繁瑣,最近一直在用scala做spark的相關開發,所以就直接使用scala+spark來搞定這件事了,當然底層用的還是Hbase的TableOutputFormat和TableOutputFormat這個和MR是一樣的,在spark裡面把從hbase裡面讀取的數據集轉成rdd了,然後做一些簡單的過濾,轉化,最終在把結果寫入到hbase裡面。

整個流程如下:

(1)全量讀取hbase表的數據

(2)做一系列的ETL

(3)把全量數據再寫回hbase

核心代碼如下:



//獲取conf
 val conf=HBaseConfiguration.create
  //設置讀取的表
  conf.set(TableInputFormat.INPUT_TABLE,tableName)
  //設置寫入的表
  conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//創建sparkConf    
   val sparkConf=new SparkConf
   //設置spark的任務名
   sparkConf.setAppName("read and write for hbase ")
   //創建spark上下文
   val sc=new SparkContext(sparkConf)
   
   //為job指定輸出格式和輸出表名
   
    val newAPIJobConfiguration1 = Job.getInstance(conf)
    newAPIJobConfiguration1.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
   
   
   //全量讀取hbase表
      val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
      ,classOf[ImmutableBytesWritable]
      ,classOf[Result]
    )
   
   //過濾空數據,然後對每一個記錄做更新,並轉換成寫入的格式
    val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
    
    //轉換後的結果,再次做過濾
    val save_rdd=final_rdd.filter(checkNull)
    
    //最終在寫回hbase表
 save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
    sc.stop
 

從上面的代碼可以看出來,使用spark+scala操作hbase是非常簡單的。下面我們看一下,中間用到的幾個自定義函數:

第一個:checkNotEmptyKs

作用:過濾掉空列簇的數據

  def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
    val r=f._2
    val rowkey=Bytes.toString(r.getRow)
    val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
    if(map.isEmpty)  false else true
  }

第二個:forDatas

作用:讀取每一條數據,做update後,在轉化成寫入操作


  def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
      val r=f._2 //獲取Result
      val put:Put=new Put(r.getRow) //聲明put
      val ks=Bytes.toBytes("ks") //讀取指定列簇
      val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
      map.foreach(kv=>{//遍歷每一個rowkey下面的指定列簇的每一列的數據做轉化
 val kid= Bytes.toString(kv._1)//知識點id
 var value=Bytes.toString(kv._2)//知識點的value值
value="修改後的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value))//放入put對象
      }
      )
    if(put.isEmpty)  null  else (new ImmutableBytesWritable,put)

  }

第三個:checkNull

作用:過濾最終結果裡面的null數據

  def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
    if(f==null)  false  else true
  }

上面就是整個處理的邏輯了,需要注意的是對hbase裡面的無效數據作過濾,跳過無效數據即可,邏輯是比較簡單的,代碼量也比較少。

除了上面的方式,還有一些開源的框架,也封裝了相關的處理邏輯,使得spark操作hbase變得更簡潔,有興趣的朋友可以瞭解下,github鏈接如下:

相關推薦

推薦中...