国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

park將數據寫入hbase以及從hbase讀取數據

2019-11-11 04:30:06
字體:
來源:轉載
供稿:網友

本文將介紹

1、Spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入Hbase

2、spark從hbase中讀取數據并轉化為RDD

操作方式為在eclipse本地運行spark連接到遠程的hbase。

java版本:1.7.0

Scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自帶zookeeper,選擇自己部署的)

Hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

將RDD寫入hbase

注意點:

依賴:

將lib目錄下的hadoop開頭jar包、hbase開頭jar包添加至classpath

此外還有lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能會有相同名稱的類,不要導錯

連接集群:

spark應用需要連接到zookeeper集群,然后借助zookeeper訪問hbase。一般可以通過兩種方式連接到zookeeper:

第一種是將hbase-site.xml文件加入classpath

第二種是在HBaseConfiguration實例中設置

如果不設置,默認連接的是localhost:2181會報錯:connection refused 

本文使用的是第二種方式。

hbase創建表:

雖然可以在spark應用中創建hbase表,但是不建議這樣做,最好在hbase shell中創建表,spark寫或讀數據

使用saveAsHadoopDataset寫入數據

package com.test    import org.apache.hadoop.hbase.HBaseConfiguration  import org.apache.hadoop.hbase.client.Put  import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.ma從hbase讀取數據轉化成RDD

本例基于官方提供的例子

package com.test    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  import org.apache.hadoop.hbase.client.HBaseAdmin  import org.apache.hadoop.hbase.mapreduce.TableInputFormat  import org.apache.spark._  import org.apache.hadoop.hbase.client.HTable  import org.apache.hadoop.hbase.client.Put  import org.apache.hadoop.hbase.util.Bytes  import org.apache.hadoop.hbase.io.ImmutableBytesWritable  import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  import org.apache.hadoop.mapred.JobConf  import org.apache.hadoop.io._    object TestHBase2 {      def main(args: Array[String]): Unit = {      val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")      val sc = new SparkContext(sparkConf)            val tablename = "account"      val conf = HBaseConfiguration.create()      //設置zooKeeper集群地址,也可以通過將hbase-site.xml導入classpath,但是建議在程序里這樣設置      conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")      //設置zookeeper連接端口,默認2181      conf.set("hbase.zookeeper.property.clientPort", "2181")      conf.set(TableInputFormat.INPUT_TABLE, tablename)        // 如果表不存在則創建表      val admin = new HBaseAdmin(conf)      if (!admin.isTableAvailable(tablename)) {        val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))        admin.createTable(tableDesc)      }        //讀取數據并轉化成rdd      val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],        classOf[org.apache.hadoop.hbase.client.Result])        val count = hBaseRDD.count()      println(count)      hBaseRDD.foreach{case (_,result) =>{        //獲取行鍵        val key = Bytes.toString(result.getRow)        //通過列族和列名獲取列        val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))        val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))        println("Row key:"+key+" Name:"+name+" Age:"+age)      }}        sc.stop()      admin.close()    }  } 
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 惠东县| 北京市| 鸡泽县| 五家渠市| 若羌县| 大田县| 东方市| 轮台县| 达尔| 广南县| 定西市| 紫阳县| 白沙| 台北市| 游戏| 云林县| 商城县| 民县| 长丰县| 闻喜县| 连南| 镇赉县| 会同县| 大关县| 寻甸| 阿勒泰市| 绥化市| 班戈县| 孝义市| 吉木乃县| 泊头市| 墨脱县| 拜城县| 容城县| 西乌珠穆沁旗| 长兴县| 高邮市| 巴青县| 盐城市| 平顺县| 晋江市|