成都创新互联网站制作重庆分公司

怎么用Spark读取HBASE数据

这篇文章主要讲解了“怎么用Spark读取HBASE数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用Spark读取HBASE数据”吧!

“专业、务实、高效、创新、把客户的事当成自己的事”是我们每一个人一直以来坚持追求的企业文化。 成都创新互联是您可以信赖的网站建设服务商、专业的互联网服务提供商! 专注于成都做网站、网站设计、软件开发、设计服务业务。我们始终坚持以客户需求为导向,结合用户体验与视觉传达,提供有针对性的项目解决方案,提供专业性的建议,创新互联建站将不断地超越自我,追逐市场,引领市场!

scala访问HBASE通常2种方式,一种是使用SPARK方式读取HBASE数据直接转换成RDD, 一种采用和JAVA类似的方式,通过HTable操作HBASE,数据获取之后再自己进行处理。 这2种方式区别应该是RDD是跑在多节点通过从HBASE获取数据,而采用HTable的方式,应该是串行了,仅仅是HBASE层面是分布式而已。

1. 转换为RDD
package com.isesol.spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.spark._
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp


object hbasescan {


  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("this is for spark SQL")
    //conf.setSparkHome("d:\\spark_home")
    val hbaseconf = HBaseConfiguration.create()
    hbaseconf.set("hbase.zookeeper.quorum", "datanode01.isesol.com,datanode02.isesol.com,datanode03.isesol.com,datanode04.isesol.com,cmserver.isesol.com")
    hbaseconf.set("hbase.zookeeper.property.clientPort", "2181")
    hbaseconf.set("maxSessionTimeout", "6")
    val sc = new SparkContext(conf)
    try {
      println("start to read from hbase")
      val hbaseContext = new HBaseContext(sc, hbaseconf)
      val scan = new Scan()
      scan.setMaxVersions()
      //scan.setRowPrefixFilter(Bytes.toBytes("i51530048-1007-9223370552914159518"))
      scan.setCaching(100)
      val filter = new SingleColumnValueFilter(Bytes.toBytes("cf"), Bytes.toBytes("age"), CompareOp.LESS, Bytes.toBytes("1"));
      scan.setFilter(filter)
      val hbaserdd = hbaseContext.hbaseRDD(TableName.valueOf("bank"), scan)
      hbaserdd.cache()
      println(hbaserdd.count())
    } catch {
      case ex: Exception => println("can not connect hbase")
    }
  }
}

2. 采用 HTable方式处理

    val htable = new HTable(hbaseconf, "t_device_fault_statistics")
    val scan1 = new Scan()
    scan1.setCaching(3*1024*1024)
    val scaner = htable.getScanner(scan1)
    
    while(scaner.iterator().hasNext()){
       val result = scaner.next()
       if(result.eq(null)){
       } else {
         println(Bytes.toString(result.getRow) + "\t" + Bytes.toString(result.getValue("cf".getBytes, "fault_level2_name".getBytes)))
       }
    }
    scaner.close()
    htable.close()

感谢各位的阅读,以上就是“怎么用Spark读取HBASE数据”的内容了,经过本文的学习后,相信大家对怎么用Spark读取HBASE数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


网站栏目:怎么用Spark读取HBASE数据
本文来源:http://cxhlcq.com/article/jhjcho.html

其他资讯

在线咨询

微信咨询

电话咨询

028-86922220(工作日)

18980820575(7×24)

提交需求

返回顶部