国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

park將數(shù)據(jù)寫入hbase以及從hbase讀取數(shù)據(jù)

2019-11-11 04:31:55
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

本文將介紹

1、Spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入Hbase

2、spark從hbase中讀取數(shù)據(jù)并轉(zhuǎn)化為RDD

操作方式為在eclipse本地運(yùn)行spark連接到遠(yuǎn)程的hbase。

java版本:1.7.0

Scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自帶zookeeper,選擇自己部署的)

Hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

將RDD寫入hbase

注意點(diǎn):

依賴:

將lib目錄下的hadoop開頭jar包、hbase開頭jar包添加至classpath

此外還有l(wèi)ib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會(huì)提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報(bào)錯(cuò))、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能會(huì)有相同名稱的類,不要導(dǎo)錯(cuò)

連接集群:

spark應(yīng)用需要連接到zookeeper集群,然后借助zookeeper訪問(wèn)hbase。一般可以通過(guò)兩種方式連接到zookeeper:

第一種是將hbase-site.xml文件加入classpath

第二種是在HBaseConfiguration實(shí)例中設(shè)置

如果不設(shè)置,默認(rèn)連接的是localhost:2181會(huì)報(bào)錯(cuò):connection refused 

本文使用的是第二種方式。

hbase創(chuàng)建表:

雖然可以在spark應(yīng)用中創(chuàng)建hbase表,但是不建議這樣做,最好在hbase shell中創(chuàng)建表,spark寫或讀數(shù)據(jù)

使用saveAsHadoopDataset寫入數(shù)據(jù)

package com.test    import org.apache.hadoop.hbase.HBaseConfiguration  import org.apache.hadoop.hbase.client.Put  import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.ma從hbase讀取數(shù)據(jù)轉(zhuǎn)化成RDD

本例基于官方提供的例子

package com.test    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  import org.apache.hadoop.hbase.client.HBaseAdmin  import org.apache.hadoop.hbase.mapreduce.TableInputFormat  import org.apache.spark._  import org.apache.hadoop.hbase.client.HTable  import org.apache.hadoop.hbase.client.Put  import org.apache.hadoop.hbase.util.Bytes  import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  import org.apache.hadoop.mapred.JobConf  import org.apache.hadoop.io._    object TestHBase2 {      def main(args: Array[String]): Unit = {      val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")      val sc = new SparkContext(sparkConf)            val tablename = "account"      val conf = HBaseConfiguration.create()      //設(shè)置zooKeeper集群地址,也可以通過(guò)將hbase-site.xml導(dǎo)入classpath,但是建議在程序里這樣設(shè)置      conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")      //設(shè)置zookeeper連接端口,默認(rèn)2181      conf.set("hbase.zookeeper.property.clientPort", "2181")      conf.set(TableInputFormat.INPUT_TABLE, tablename)        // 如果表不存在則創(chuàng)建表      val admin = new HBaseAdmin(conf)      if (!admin.isTableAvailable(tablename)) {        val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))        admin.createTable(tableDesc)      }        //讀取數(shù)據(jù)并轉(zhuǎn)化成rdd      val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],        classOf[org.apache.hadoop.hbase.client.Result])        val count = hBaseRDD.count()      println(count)      hBaseRDD.foreach{case (_,result) =>{        //獲取行鍵        val key = Bytes.toString(result.getRow)        //通過(guò)列族和列名獲取列        val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))        val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))        println("Row key:"+key+" Name:"+name+" Age:"+age)      }}        sc.stop()      admin.close()    }  } 
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 荆门市| 云浮市| 利辛县| 云阳县| 汶川县| 泗洪县| 昌黎县| 高淳县| 蒲江县| 木兰县| 嘉兴市| 当雄县| 拉孜县| 宁明县| 福清市| 都兰县| 韶山市| 雅安市| 克什克腾旗| 富锦市| 丽江市| 从化市| 苍山县| 亚东县| 鹤岗市| 石门县| 苍梧县| 航空| 若尔盖县| 邹城市| 和平县| 富锦市| 普兰县| 重庆市| 张掖市| 绥江县| 佛坪县| 日土县| 榕江县| 重庆市| 襄樊市|