Spark读写Hbase(用户画像)

共 12408字,需浏览 25分钟

 ·

2021-08-13 22:34

背景

依旧是公司用户画像项目,目前方案是将hive聚合之后的标签表全部倒入mysql,然后在ES建立索引,虽然限定了最大查询范围为90天的数据,但是面对千万级的用户量,90天的数据依旧是非常庞大,每天查询的效率依旧是在30分钟以上,所以准备对这块进行优化。

在公司层面进行调研之后发现,公司遗留了一个小的Hbase集群,集群配置:

1 active master, 1 backup masters, 2 servers。

但是问题是集群的版本为:1.1.2,非常低,在上篇文章中:

为了做用户画像的流程打通,本地建立的Hbase版本为1.3.6,spark版本为2.4,所以整套体系都不支持公司原来的hbase集群体系,为了保障之后的用户画像能落地,有两套解决方案:

1、公司层面的Hbase集群升级,由于历史包袱太重,之前的版本虽然老,但是依旧有部分的数据在上面跑,如果版本升级,后续对应的下游系统中间件可能会出现不兼容的问题,而且在Hbase层做适配需要调研太多的下游业务的使用场景,成本太高,所以未选用。

2、将本地的伪分布式Hbase进行降级,同时spark版本也进行降级处理。目前测试环境选定的Hbase版本为1.1.2,spark版本为2.1.1。

下面的文章主要是基于这两个版本的中间件,进行spark对Hbase的读写操作。

介绍

1、Hbase版本降级,1.1.2版本为2015年的版本

wget archive.apache.org/dist

2、按照上篇文章对Hbase进行配置和按照测试

访问页面:localhost:60010/master-

启动hbase shell,并插入数据:

Scan的结果:Row_key、列族、列名、时间戳、value值

实际结果用图标展现为:

转换为常见的关系型数据库的视角来看的话:

从表述中能看出来在关系型数据库中的存储同一份数据需要3行,而在Hbase是一行的,而且同一个列族是在同一个Store里面的,更加方便查询。

3、修改pom.xml文件

    <properties>
<scala.version>2.11</scala.version>
<spark.version>2.1.1</spark.version>
<hadoop.version>2.7.7</hadoop.version>
</properties>

<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.1.2</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.1.2</version>
</dependency>

主要是添加hbase-client、hbase-server、hbase-common,并将spark版本修改为2.1.1。

3、spark读写Hbase测试

往Hbase里面写数据:

  • 通过HTable中put方法:

package Flink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapred.JobConf
/** *
*
* @autor gaowei
* @Date 2020-07-28 17:55
*/
object HbaseT1 {
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)

conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}

admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)

val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"

// 配置相关信息

val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)


val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))

indataRDD.foreachPartition(x=> {
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

val htable = getTable(conf,tableName)

x.foreach(y => {
val arr = y.split(",")
val key = arr(0)
val value = arr(1)
val value1 = arr(2)
val value2 = arr(3)

val put = new Put(Bytes.toBytes(key))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(value))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(value1))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(value2))
htable.put(put)
})
})
sc.stop()
}

}
  • TableOutputFormat向HBase写数据

package Flink
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapred.JobConf
/** *
*
* @autor gaowei
* @Date 2020-07-28 17:55
*/
object HbaseT1 {
def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)

conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}

admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)

val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"

// 配置相关信息

val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)


val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))

val jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
indataRDD.map(_.split(",")).map{arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(arr(2)))
put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(arr(3)))
(new ImmutableBytesWritable,put)
}}.saveAsHadoopDataset(jobConf)
sc.stop()
}

}

4、读Hbase的数据

package Flink
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/** *
*
* @autor gaowei
* @Date 2020-07-27 17:06
*/
object HbseTest {

def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",quorum)
conf.set("hbase.zookeeper.property.clientPort",port)

conf
}
def getHBaseAdmin(conf:Configuration,tableName:String) = {
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
admin.createTable(tableDesc)
}

admin
}
def getTable(conf:Configuration,tableName:String) = {
new HTable(conf,tableName)
}

def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)

val tableName = "TEST.USER_INFO"
val quorum = "localhost"
val port = "2181"

// 配置相关信息
val conf = getHBaseConfiguration(quorum,port,tableName)
conf.set(TableInputFormat.INPUT_TABLE,tableName)

// HBase数据转成RDD
val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]).cache()

// RDD数据操作
val data = hBaseRDD.map(x => {
val result = x._2
val key = Bytes.toString(result.getRow)
val value = Bytes.toString(result.getValue("INFO".getBytes,"C1".getBytes))
val value1 = Bytes.toString(result.getValue("INFO".getBytes,"C2".getBytes))
val value2 = Bytes.toString(result.getValue("INFO".getBytes,"C3".getBytes))
(key,value,value1,value2)
})

data.foreach(println)

sc.stop()
}

}

结果:

去Hbase查询的结果:

结论:

Hadoop本质上是:分布式文件系统(HDFS) + 分布式计算框架(Mapreduce) + 调度系统Yarn搭建起来的分布式大数据处理框架。

Hive:是一个基于Hadoop的数据仓库,适用于一些高延迟性的应用(离线开发),可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能。

Hive可以认为是MapReduce的一个包装,把好写的HQL转换为的MapReduce程序,本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表是纯逻辑表。hive需要用到hdfs存储文件,需要用到MapReduce计算框架。

HBase:是一个Hadoop的数据库,一个分布式、可扩展、大数据的存储。hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。

HBase可以认为是HDFS的一个包装。他的本质是数据存储,是个NoSql数据库;HBase部署于HDFS之上,并且克服了hdfs在随机读写方面的缺点,提高查询效率。

推荐阅读:

世界的真实格局分析,地球人类社会底层运行原理

不是你需要中台,而是一名合格的架构师(附各大厂中台建设PPT)

企业IT技术架构规划方案

论数字化转型——转什么,如何转?

华为干部与人才发展手册(附PPT)

企业10大管理流程图,数字化转型从业者必备!

【中台实践】华为大数据中台架构分享.pdf

华为的数字化转型方法论

华为如何实施数字化转型(附PPT)

超详细280页Docker实战文档!开放下载

华为大数据解决方案(PPT)

浏览 26
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报