本文將介紹
1、Spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset將RDD寫入Hbase
2、spark從hbase中讀取數(shù)據(jù)并轉(zhuǎn)化為RDD
操作方式為在eclipse本地運(yùn)行spark連接到遠(yuǎn)程的hbase。
java版本:1.7.0
Scala版本:2.10.4
zookeeper版本:3.4.5(禁用了hbase自帶zookeeper,選擇自己部署的)
Hadoop版本:2.4.1
spark版本:1.6.1
hbase版本:1.2.3
集群:centos6.5_x64
注意點(diǎn):
依賴:
將lib目錄下的hadoop開頭jar包、hbase開頭jar包添加至classpath
此外還有l(wèi)ib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會(huì)提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報(bào)錯(cuò))、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar
$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar
不同的package中可能會(huì)有相同名稱的類,不要導(dǎo)錯(cuò)
連接集群:
spark應(yīng)用需要連接到zookeeper集群,然后借助zookeeper訪問(wèn)hbase。一般可以通過(guò)兩種方式連接到zookeeper:
第一種是將hbase-site.xml文件加入classpath
第二種是在HBaseConfiguration實(shí)例中設(shè)置
如果不設(shè)置,默認(rèn)連接的是localhost:2181會(huì)報(bào)錯(cuò):connection refused
本文使用的是第二種方式。
hbase創(chuàng)建表:
雖然可以在spark應(yīng)用中創(chuàng)建hbase表,但是不建議這樣做,最好在hbase shell中創(chuàng)建表,spark寫或讀數(shù)據(jù)
使用saveAsHadoopDataset寫入數(shù)據(jù)
package com.test import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.ma從hbase讀取數(shù)據(jù)轉(zhuǎn)化成RDD本例基于官方提供的例子
package com.test import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io._ object TestHBase2 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "account" val conf = HBaseConfiguration.create() //設(shè)置zooKeeper集群地址,也可以通過(guò)將hbase-site.xml導(dǎo)入classpath,但是建議在程序里這樣設(shè)置 conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3") //設(shè)置zookeeper連接端口,默認(rèn)2181 conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set(TableInputFormat.INPUT_TABLE, tablename) // 如果表不存在則創(chuàng)建表 val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tablename)) { val tableDesc = new HTableDescriptor(TableName.valueOf(tablename)) admin.createTable(tableDesc) } //讀取數(shù)據(jù)并轉(zhuǎn)化成rdd val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println(count) hBaseRDD.foreach{case (_,result) =>{ //獲取行鍵 val key = Bytes.toString(result.getRow) //通過(guò)列族和列名獲取列 val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age) }} sc.stop() admin.close() } }新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注