Kyuubi实践 | 编译 Spark3.1 以适配 CDH5 并集成 Kyuubi
共 18037字,需浏览 37分钟
·
2021-10-01 02:11
1. Spark ThriftServer 在我们业务场景中的不足
2. 关于 Kyuubi
3. 编译 Spark3.1.2 以适配 CDH5
4. 测试 Spark 是否正常可用
5. 安装配置 kyuubi
5.1 创建用户 kyuubi
5.2 配置 kyuubi 代理用户的权限
5.3 创建用户 kyuubi 的 principal 并导出 keytab
5.4 在 Sentry 中给 kyuubi 用户赋予角色以及权限
5.5 配置 Kyuubi
6. Kyuubi 测试
6.1 Kyuubi 多租户测试
6.2 Kyuubi CONNECTION 级别的引擎共享
7. 总结
1. Spark ThriftServer 在我们业务场景中的不足
STS 是 Spark 中自带的一个组件,它可以在 yarn 上启动一个常驻的 Spark 应用,然后对外提供 JDBC 服务。
STS 在我们内部最典型的应用场景是,每天大量的 Tableau Spark Thrift 数据源刷新任务,需要定时执行,并拉取数据到 Tableau Server 端。在较高的并发负载下或一次性拉取的数据量过多时,就大概率会出现 driver 端进程卡死的情况(具体表现是,driver 端进程老年代内存占用 100%,GC 线程无法工作,分析 dump 文件,发现有内存泄漏的现象发生)。
每当遇到这样的情况,只能选择重启 STS,这期间 Application 中运行的 job 也一并会被杀死,导致数据源刷新任务频繁重试,浪费集群资源的同时,也会造成报表数据刷新的 SLA 无法满足业务方的要求。
基于此,我们开始了对 kyuubi 的调研,并重点测试其高可用,以及 CONNECTION 级别的引擎共享能力。
2. 关于 Kyuubi
Kyuubi 是一个分布式多租户 Thrift JDBC/ODBC 服务,同样构建在 Apache Spark 之上,用于大规模数据管理、处理和分析,类似 STS,但功能比 STS 更加丰富和优良。
其更详细的文档以及与 STS 的深入对比,可以参考其官方文档:
https://blog.csdn.net/NetEaseResearch/article/details/115013920
https://kyuubi.apache.org/docs/stable/integrations/index.html
3. 编译 Spark3.1.2 以适配 CDH5
我们团队内部 Hadoop 集群的版本是:
hive 1.1.0-cdh5.13.1 hadoop 2.6.0-cdh-5.13.1
同时,集群开启了 Sentry 和 Kerberos。
从 Spark3.0.x 开始,官方要求编译 Spark 时所依赖的 Hadoop 的最低版本是 2.7,默认所支持的 hive 的版本也高于 1.1.0。基于此,本小节记录了针对在 CDH5 的环境中,编译以及测试 Spark3.1.2 的完整流程。
当然,如果你没有强迫症,也可以选择 spark3-hadoop-2.7 的安装包来进行测试,kyuubi 社区也有大佬做过这样的测试。
下载 Spark3.1.2 的代码并导入到 IDEA 中,修改 Spark 的父 pom 文件,加入aliyun
、cloudera
、maven center
的仓库地址,防止在国内的网络环境下,有些依赖的 jar 包下载不下来(原来的 repository 地址可以注释掉)。
<repository>
<id>aliyun</id>
<name>Nexus Release Repository</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>cloudera</id>
<name>cloudera Repository</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
在 pom 中加入 cdh-hadoop 的 profile,然后在 IDEA 中先尝试 build 下代码,看是否有报错发生。
<profile>
<id>hadoop2.6.0-cdh5.13.1</id>
<properties>
<hadoop.version>hadoop2.6.0-cdh5.13.1</hadoop.version>
<curator.version>2.7.1</curator.version>
<commons-io.version>2.4</commons-io.version>
<javax.servlet-api.name>servlet-api</javax.servlet-api.name>
</properties>
</profile>
点击绿色的锤子按钮,尝试 build 下代码。
正常情况下,在 build 的执行过程中,会有编译型异常发生。
Spark3.1.x 适配hadoop2.6.0-cdh5.13.1
需要修改代码的地方有两处:
一处是修改resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
,
参考:https://github.com/apache/spark/pull/16884/files
修改后的代码示例:
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
// 原有代码
/* try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
logAggregationContext.setRolledLogsIncludePattern(includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}*/
try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
// These two methods were added in Hadoop 2.6.4, so we still need to use reflection to
// avoid compile error when building against Hadoop 2.6.0 ~ 2.6.3.
val setRolledLogsIncludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
val setRolledLogsExcludePatternMethod =
logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
case NonFatal(e) =>
logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
"does not support it", e)
}
}
第二处修改/spark-3.1.2/core/src/main/scala/org/apache/spark/util/Utils.scala
:
def unpack(source: File, dest: File): Unit = {
// StringUtils 在hadoop2.6.0中引用不到,所以取消此import,然后修改为相似的功能
// val lowerSrc = StringUtils.toLowerCase(source.getName)
if (source.getName == null) {
throw new NullPointerException
}
val lowerSrc = source.getName.toLowerCase()
if (lowerSrc.endsWith(".jar")) {
RunJar.unJar(source, dest, RunJar.MATCH_ANY)
} else if (lowerSrc.endsWith(".zip")) {
FileUtil.unZip(source, dest)
} else if (
lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) {
FileUtil.unTar(source, dest)
} else {
logWarning(s"Cannot unpack $source, just copying it to $dest.")
copyRecursive(source, dest)
}
}
然后重新 build,修改完代码之后,build 的过程中应该不会再有报错发生。
hive 的版本就不要在 pom 中替换了,替换之后的编译报错修改起来会很麻烦。所以 hive 就用默认版本(2.3.7),因为从 Spark1.4.0 开始,Spark SQL 的单个二进制版本可用于查询不同版本的 Hive 元存储。具体内容可以参考:
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore
运行打包命令前,请先修改下dev/make-distribution.sh
,具体修改内容如下:
# spark版本
VERSION=3.1.2
# scala版本
SCALA_VERSION=2.12
# hadoop版本
SPARK_HADOOP_VERSION=2.6.0-cdh5.13.1
# 开启hive
SPARK_HIVE=1
# 原来的内容注释掉
#VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ \
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | fgrep --count "<id>hive</id>";\
# # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# # because we use "set -o pipefail"
# echo -n)
在 Spark 项目的根路径下运行编译打包的命令
./dev/make-distribution.sh --name 2.6.0-cdh5.13.1 --pip --tgz -Phive -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes -Phadoop2.6.0-cdh5.13.1 -Dhadoop.version=2.6.0-cdh5.13.1 -Dscala.version=2.12.10
首次编译时会拉取大量依赖,遇到网络错误可以多试几次。编译过程中会下载如 maven 安装包、Scala 之类的软件包到项目根路径的 build 目录下,在网络慢的情况下,可以手动下载这些软件包,然后移动到相应的目录之下也行。
Mac 下 R 环境有些问题,所以这里编译时就没带上 sparkR 的支持,所以 sparkR 估计用不了,编译完成之后最终的安装包会输出在 Spark 项目的根路径下。
4. 测试 Spark 是否正常可用
编译的工作还算顺利,网上也有很多类似的文章可供参考,如:
https://www.codeleading.com/article/42014747657/
之后的工作便是测试 Spark 的可用性,这里我主要测试的是,跑通 spark-shell、spark-sql,正常提交作业到 yarn 集群,正常读取到 hive 表中的数据,至于其他更细致的测试,后续会有文章接着介绍。
Spark 无需分布式部署安装,只需要准备好一个 Spark 的客户端环境即可。
涉及到修改 Spark 配置的命令参考如下:
cd /opt/spark-3.1.2/conf
# 把hive hdfs 相关配置文件的软连接构建起来
ln -s /etc/hive/conf/hive-site.xml hive-site.xml
ln -s /etc/hive/conf/hdfs-site.xml hdfs-site.xml
ln -s /etc/hive/conf/core-site.xml core-site.xml
mv log4j.properties.template log4j.properties
mv spark-defaults.conf.template spark-defaults.conf
mv spark-env.sh.template spark-env.sh
vim spark-defaults.conf
# 自定义spark要加载的hive metastore的版本以及jar包的路径,注意在spark3.1.x 与 spark3.0.x中此处配置的差异
spark.sql.hive.metastore.version=1.1.0
spark.sql.hive.metastore.jars=path
spark.sql.hive.metastore.jars.path=file:///opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/hive/lib/*
spark.sql.hive.metastore.version 不要写1.1.0-cdh5.13.1
,spark 中 metastore 的自定义配置请参考:
http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables
测试 spark-shell 以及 spark-sql 的命令参考:
export SPARK_HOPME=/opt/spark-3.1.2
# 本地跑spark-shell
sh /opt/spark-3.1.2/bin/spark-shell
# 以yarn-client的方式跑spark-shell
sh /opt/spark-3.1.2/bin/spark-shell --master yarn --deploy-mode client --executor-memory 1G --num-executors 2
# 以yarn-cluster的方式跑spark-sql
sh /opt/spark-3.1.2/bin/spark-sql --master yarn -deploy-mode cluster --executor-memory 1G --num-executors 2
# 跑一些spark代码 操作hive表中的一些数据,测试正常就OK
5. 安装配置 kyuubi
kyuubi 的安装包可以选择官方制作好的,如:
也可以自己拉取代码进行编译,这里我选择自己编译 kyuubi,使用的版本是最新发布的
v1.3.0-incubating
https://github.com/apache/incubator-kyuubi/releases/tag/v1.3.0-incubating
kyuubi 的编译请参考:
https://kyuubi.apache.org/docs/r1.3.0-incubating/develop_tools/distribution.html
下载代码到本地,导入 IDEA 中(或不导入),在项目根路径下输入:
./build/dist --tgz --spark-provided
# --spark-provided 我的spark环境已有,这里选择不把spark一并打包
打包完成之后,上传 tar 包至服务器,解压缩,去掉配置文件的后缀.template
配置和运行 kyuubi 之前,需要先做一些额外的操作。
5.1 创建用户 kyuubi
创建一个独立用户 kyuubi 来起 kyuubi 服务
useradd kyuubi
5.2 配置 kyuubi 代理用户的权限
kyuubi 需要代理其他用户来启动引擎,所以需要为其设置代理用户的权限。
5.3 创建用户 kyuubi 的 principal 并导出 keytab
命令参考:
kadmin.local
addprinc -pw 123456 kyuubi/node2.bigdata.leo.com@LEO.COM
ktadd -k kyuubi.keytab -norandkey kyuubi/node2.bigdata.leo.com@LEO.COM
其中 principal 的组成有 kyuubi node2.bigdata.leo.com @LEO.COM 这三部分组成,如果设置为kyuubi@LEO.COM
,运行 kyuubi 时会报错。
5.4 在 Sentry 中给 kyuubi 用户赋予角色以及权限
无 Sentry 可跳过授权,但还是需要注意 kyuubi 用户在 hdfs 以及 hive 中是否存在权限问题。
5.5 配置 Kyuubi
上述准备工作完成之后,可以配置 kyuubi。参考命令以及参考的配置文件。
vim kyuubi-defaults.conf
# 启动的spark引擎以yarn-cluster模式跑
spark.master=yarn
spark.submit.deployMode=cluster
# spark引擎共享级别,user,即同一用户共享一个引擎,
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
# 启用HA,指定ZK地址
kyuubi.ha.enabled true
kyuubi.ha.zookeeper.quorum node1:2181,node2:2181,node3:2181
kyuubi.ha.zookeeper.client.port 2181
# kerbero授权开启
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=kyuubi/node2.bigdata.leo.com@LEO.COM
kyuubi.kinit.keytab=/home/kyuubi/kyuubi.keytab
# kyuubi环境变量配置,按需配置
vim kyuubi-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export SPARK_HOME=/opt/spark-3.1.2
关于 kyuubi 更详细的配置参数,请参考。
https://kyuubi.apache.org/docs/r1.3.0-incubating/deployment/settings.html#kyuubi-configurations
运行 kyuubi
bin/kyuubi start
运行 beeline 测试连接 kyuubi
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/node2.bigdata.leo.com@LEO.COM"
查看日志,先查看 kyuubi 服务运行过程中的输出日志
/opt/kyuubi-1.3.0/logs/kyuubi-kyuubi-org.apache.kyuubi.server.KyuubiServer-node2.bigdata.leo.com.out
此日志中没有上述异常发生的具体原因,继续看日志,此日志为 kyuubi 启动的 Spark 的 engine 日志
/opt/kyuubi-1.3.0/work/kyuubi/kyuubi-spark-sql-engine.log.0
此日志中依旧没有有用的信息,继续看 spark application 的日志,由于没有 Spark History 服务,所以我这里用 yarn 命令来看日志:
yarn logs -applicationId application_1632672215527_0014
由于开启了 HA,kyuubi 会在指定 ZK 上创建一些 znode path,由于我的 zookeeper 服务在 kerberos 环境中,因此可能会遇到 zk kerberos 认证,或 zk node path 的 ACL 问题,在 zk shell 查看/kyuubi_USER 的 ACL,并尝试是否可以创建 znode path。
经过上述测试可知,kyuubi 用户 kinit 拿到 kerberos 凭证之后,再去操作/kyuubi_USER 这个 znode path 时,并不会出现 ACL 问题。
根据官方大佬的解释以及大致看了下源码之后得知,这里出现 ZK ACL 的问题是因为,kyuubi server 启动,创建 zkClient 时,会获取 kerberos 认证。但在 spark engine 的 driver 端创建的 zkClient,并没有去获取 Kerberos 认证,所以才产生这个报错(分析不到位的地方还望与我沟通)。
临时的解决办法是取消 HA 配置或使用其他未开启 kerberos 的 Zookeeper(待确认)。禁用 HA 使用 kyuubi 自带的 ZK 服务,需要修改 kyuubi-defaults.conf 为:
spark.master=yarn
spark.submit.deployMode=cluster
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
kyuubi.ha.enabled false
kyuubi.zookeeper.embedded.client.port 2182
kyuubi.authentication=KERBEROS
kyuubi.kinit.principal=kyuubi/node2.bigdata.leo.com@LEO.COM
kyuubi.kinit.keytab=/home/kyuubi/kyuubi.keytab
重启 kyuubi 之后,再用 beeline 连接,测试正常。
6. Kyuubi 测试
6.1 Kyuubi 多租户测试
保证 kyuubi server 正常启动,创建一个测试用户 leo_jie,并为其创建 principal 以及导出对应用户的 keytab,分别在两个客户端上用 kyuubi 以及 leo_jie 来测试连接 kyuubi server。示例命令如下:
# 用户leo_jie的操作
kinit -kt leo_jie.keytab leo_jie@LEO.COM
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/node2.bigdata.leo.com@LEO.COM"
# 用户kyuubi的操作
kinit -kt kyuubi.keytab kyuubi/node2.bigdata.leo.com@LEO.COM
beeline -u "jdbc:hive2://node2.bigdata.leo.com:10009/;principal=kyuubi/node2.bigdata.leo.com@LEO.COM"
在 yarn ui 上可以看到起了两个 application,一个所属用户是 leo_jie,一个是 kyuubi。如果是 STS,则需要连续手动启动两个 STS 进程。
Spark Engine 空闲一段时间之后就会被释放,这个空闲时间可由参数kyuubi.session.engine.idle.timeout
来控制。
6.2 Kyuubi CONNECTION 级别的引擎共享
从资源共享的角度,Engine 支持用户端配置不同的共享级别,如果设置为 CONNECTION 级别共享,用户的一次 JDBC 连接就会初始化一个 Engine(例如:起多个 beeline 客户端,就会起多个 Spark Engine,对应多个 Spark Application),在这个连接内,用户可以执行多个 Statement 直至连接关闭。
如果设置为 USER 级别共享,用户的多次 JDBC 都会复用这个 Engine(例如:起多个 beeline 客户端,只会起一个 Spark Engine,对应一个 Spark Application),在高可用的模式中,无论用户的连接打到哪个 Kyuubi Server 实例上,这个 Engine 都能实现共享。
配置 conf/kyuubi-defaults.conf 以支持 CONNECTION 级别的引擎共享
kyuubi.engine.share.level=CONNECTION
# kyuubi.engine.share.level=USER
# kyuubi.session.engine.idle.timeout=PT10H
在多个客户端中起多个 beeline 客户端,yarn-ui 上可以看到每一个客户端对应着一个 Application,客户端连接断开之后,对应的 Spark Application 也会被释放掉。
7. 总结
上述文章梳理并记录了 Spark3.1.2 适配 CDH5 Hadoop 的编译和功能测试过程,并初步体验了 kyuubi,简单测试了 kyuubi 的多租户隔离特性,以及不同级别的 Engine 共享能力。
尤其是 Kyuubi 的 CONNECTION 级别的引擎共享机制,可以有效地解决 STS 被某些 SQL 执行时卡死的情况,每一个 tableau 数据源刷新任务触发时所建立的对 Kyuubi Server 的 ODBC 连接,都能对应一个独立的 Driver,就算此 Driver 被打死,也不会影响其他 SQL 的正常执行。
之后会继续为大家介绍我们在生产环境中对 Kyuubi 的使用经验。