Spark读写Hbase(用户画像)
背景
依旧是公司用户画像项目,目前方案是将hive聚合之后的标签表全部倒入mysql,然后在ES建立索引,虽然限定了最大查询范围为90天的数据,但是面对千万级的用户量,90天的数据依旧是非常庞大,每天查询的效率依旧是在30分钟以上,所以准备对这块进行优化。
在公司层面进行调研之后发现,公司遗留了一个小的Hbase集群,集群配置:
1 active master, 1 backup masters, 2 servers。
但是问题是集群的版本为:1.1.2,非常低,在上篇文章中:
为了做用户画像的流程打通,本地建立的Hbase版本为1.3.6,spark版本为2.4,所以整套体系都不支持公司原来的hbase集群体系,为了保障之后的用户画像能落地,有两套解决方案:
1、公司层面的Hbase集群升级,由于历史包袱太重,之前的版本虽然老,但是依旧有部分的数据在上面跑,如果版本升级,后续对应的下游系统中间件可能会出现不兼容的问题,而且在Hbase层做适配需要调研太多的下游业务的使用场景,成本太高,所以未选用。
2、将本地的伪分布式Hbase进行降级,同时spark版本也进行降级处理。目前测试环境选定的Hbase版本为1.1.2,spark版本为2.1.1。
下面的文章主要是基于这两个版本的中间件,进行spark对Hbase的读写操作。
介绍
1、Hbase版本降级,1.1.2版本为2015年的版本
wget archive.apache.org/dist
2、按照上篇文章对Hbase进行配置和按照测试
访问页面:localhost:60010/master-
启动hbase shell,并插入数据:
Scan的结果:Row_key、列族、列名、时间戳、value值
实际结果用图标展现为:
转换为常见的关系型数据库的视角来看的话:
从表述中能看出来在关系型数据库中的存储同一份数据需要3行,而在Hbase是一行的,而且同一个列族是在同一个Store里面的,更加方便查询。
3、修改pom.xml文件
<properties>
<scala.version>2.11</scala.version>
<spark.version>2.1.1</spark.version>
<hadoop.version>2.7.7</hadoop.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.1.2</version>
</dependency>
主要是添加hbase-client、hbase-server、hbase-common,并将spark版本修改为2.1.1。
3、spark读写Hbase测试
往Hbase里面写数据:
通过HTable中put方法:
package Flink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapred.JobConf
/** *
*
* @autor gaowei
* @Date 2020-07-28 17:55
*/
object HbaseT1 {
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)
conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}
admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"
// 配置相关信息
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
indataRDD.foreachPartition(x=> {
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
val htable = getTable(conf,tableName)
x.foreach(y => {
val arr = y.split(",")
val key = arr(0)
val value = arr(1)
val value1 = arr(2)
val value2 = arr(3)
val put = new Put(Bytes.toBytes(key))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(value))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(value1))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(value2))
htable.put(put)
})
})
sc.stop()
}
}
TableOutputFormat向HBase写数据
package Flink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapred.JobConf
/** *
*
* @autor gaowei
* @Date 2020-07-28 17:55
*/
object HbaseT1 {
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)
conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}
admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"
// 配置相关信息
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
indataRDD.map(_.split(",")).map{arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(arr(2)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(arr(3)))
(new ImmutableBytesWritable,put)
}}.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
4、读Hbase的数据
package Flink
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/** *
*
* @autor gaowei
* @Date 2020-07-27 17:06
*/
object HbseTest {
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)
conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}
admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"
// 配置相关信息
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableInputFormat.INPUT_TABLE,tableName)
// HBase数据转成RDD
val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]).cache()
// RDD数据操作
val data = hBaseRDD.map(x => {
val result = x._2
val key = Bytes.toString(result.getRow)
val value = Bytes.toString(result.getValue("INFO".getBytes,"C1".getBytes))
val value1 = Bytes.toString(result.getValue("INFO".getBytes,"C2".getBytes))
val value2 = Bytes.toString(result.getValue("INFO".getBytes,"C3".getBytes))
(key,value,value1,value2)
})
data.foreach(println)
sc.stop()
}
}
结果:
去Hbase查询的结果:
结论:
Hadoop本质上是:分布式文件系统(HDFS) + 分布式计算框架(Mapreduce) + 调度系统Yarn搭建起来的分布式大数据处理框架。
Hive:是一个基于Hadoop的数据仓库,适用于一些高延迟性的应用(离线开发),可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能。
Hive可以认为是MapReduce的一个包装,把好写的HQL转换为的MapReduce程序,本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表是纯逻辑表。hive需要用到hdfs存储文件,需要用到MapReduce计算框架。
HBase:是一个Hadoop的数据库,一个分布式、可扩展、大数据的存储。hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。
HBase可以认为是HDFS的一个包装。他的本质是数据存储,是个NoSql数据库;HBase部署于HDFS之上,并且克服了hdfs在随机读写方面的缺点,提高查询效率。
推荐阅读: