Kyuubi 实践 | Apache Kyuubi on Spark 在CDH上的深度实践
Kyuubi 是网易有数的大数据开源项目,于2021年6月全票通过进入世界顶级开源基金会 Apache Software Foundation 孵化器。
Kyuubi 的命名源自中国神话《山海经》,意为“九尾狐”。狐会喷火,象征Spark;狐有九尾,类比多租户,在Spark上实现多租户是系统设计之初的主要目的。然后取了动漫《火影忍者》中角色九尾的罗马音['kju:bi:],作为项目名称。
Kyuubi 的目标是让“大数据平民化”。为实现这个目标,我们遵循“专业人做专业事”的准则,通过 Kyuubi的 C/S 架构,服务端大数据专家可以将 Spark 等大数据算力极致优化并高度封装后提供出来,业务端可通过该算力直接在自己擅长的业务领域处理数据产生价值,两者之间通过简单的接口进行必要且有效的直接交互。
Kyuubi 使用场景:
替换 HiveServer2,轻松获得 10~100 倍性能提升
构建 Serverless Spark 平台
构建统一数据湖探索分析管理平台
CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。
随着 Spark 3.0 的重磅发布,在性能方面又迎来了一次飞跃,本文将描述把 Spark 3 集成到 CDH 6.3.1(未开启 Kerberos) 的过程,并使用 Kyuubi 替换 HiveServer2,实现 OLAP、ETL 等场景下从 HiveQL 到 SparkSQL 的无缝迁移,享受 10x-100x 的性能红利。
CDH 缺陷修复
[ORC-125] 修复 Hive 不能读取高版本 ORC 写入的数据
当使用 Hive 读取由 Presto 或者 Spark 等写入的 ORC 文件时,会出现以下错误。
ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6
该问题在 ORC 上游被修复 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE。
ORC 最早是 Hive 的一个子项目,在 CDH 6 集成的 Hive 2.1 这个版本里,ORC 还没有分离出去,所以这个问题要在 Hive 源码里修复。
我做了一个打包好的修复版本,GitHub 传送门 ①,下载更换 /opt/cloudera/parcels/CDH/lib/hive/lib 路径下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar即可。(至少需要更换 Hadoop Client、HiveServer2 节点,如果你不知道我在说什么,就把所有节点都换掉)
Spark 3.1
[SPARK-33212] Spark 使用 Hadoop Shaded Client
Hadoop 3.0 提供了 Shaded Client,用于下游项目规避依赖冲突 [HADOOP-11656] Classpath isolation for downstream clients。
Spark 3.2 在 hadoop-3.2 profile 中切换到了 Hadoop Shaded Client [SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile。
该更改不是必须的,但个人建议从 Spark 主线将该补丁移植到 branch-3.1 使用,以规避潜在的依赖冲突。
[CDH-71907] Spark HiveShim 适配 CDH Hive
Spark 通过反射和隔离的类加载器来实现对多版本 Hive Metastore 的支持,详情参考 Interacting with Different Versions of Hive Metastore - Spark Documentation。
CDH 6 使用修改过的 Hive 2.1.1 版本,其方法签名与 Apache 版本有所不同,故 Spark HiveShim 反射调用会出现找不到方法签名,需要手动将 CDH-71907 补丁打到 branch-3.1。
Spark External Shuffle Service 协议兼容
Spark shuffle 时,mapper 会将数据写入到本地磁盘而非 HDFS,引入 ESS 后,会将文件信息注册到 ESS 中,将 mapper 与 reducer 解耦。CDH 中,默认会启用 Spark Yarn External Shuffle Service,作为 YARN AUX Service 在所有 Yarn Node 上启动。
Spark 3 修改了 shuffle 通信协议,在与 CDH 2.4 版本的 ESS 交互时,需要设置 spark.shuffle.useOldFetchProtocol=true,否则可能报如下错误。[SPARK-29435] Spark 3 doesn't work with older shuffle service
IllegalArgumentException: Unexpected message type: <number>.
Spark 版本迁移指南
如果你有现存的基于 CDH Spark 2.4 的 Spark SQL/Job,在将其迁移至 Spark 3 版本前,请参阅完整的官方迁移指南。
Migration Guide: Spark Core - Spark Documentation ②
Migration Guide: SQL, Datasets and DataFrame - Spark Documentation ③
Spark 部署
官方文档 Running Spark on YARN - Documentation 中提到
To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME
https://spark.apache.org/docs/latest/running-on-yarn.html#preparations
因此,无需在 CDH 所有节点上部署 Spark 3,只需在 Hadoop Client 节点上部署 Spark 3 即可。
如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。
我基于 Spark 3.1.2 制作了一个适配 CDH 6 的版本, GitHub 传送门 ,下载解压至 /opt,并软链至 /opt/spark3。
[hive@cdh-kyuubi]$ ls -l /opt | grep spark
lrwxrwxrwx 1 root root 39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
drwxr-xr-x 13 hive hive 4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2
配置 Hadoop、Hive
CDH 会将配置文件自动分发到所有节点 /etc 目录下,建立软链即可。
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/
配置 Spark 环境变量 /opt/spark3/conf/spark-env.sh
#!/usr/bin/env bash
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf
配置 Spark 默认参数 /opt/spark3/conf/spark-defaults.conf
请参考 Configuration - Spark Documentation 根据集群环境实际情况进行微调
spark.authenticate=false
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true
spark.files.overwrite=true
spark.files.useFetchCache=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.shuffle.useOldFetchProtocol=true
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.yarn.historyServer.address=http://cdh-master2:18088
spark.yarn.historyServer.allowTracking=true
spark.master=yarn
spark.submit.deployMode=cluster
spark.driver.memory=2G
spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=2G
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2G
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.sql.cbo.enabled=true
spark.sql.cbo.starSchemaDetection=true
spark.sql.datetime.java8API.enabled=false
spark.sql.sources.partitionOverwriteMode=dynamic
spark.sql.hive.convertMetastoreParquet=false
spark.sql.hive.convertMetastoreParquet.mergeSchema=false
spark.sql.hive.metastore.version=2.1.1
spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.orc.mergeSchema=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.writeLegacyFormat=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.autoBroadcastJoinThreshold=-1
验证 spark-shell 工作正常
[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://cdh-master1:4040
Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2-cdh6
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
+----------------+
| namespace|
+----------------+
| test|
+----------------+
scala>
至此,Spark 3 on CDH 6 已经部署完成,可以像 CDH 自带的 Spark 2.4 一样,正常的使用 spark-submit、spark-shell,但依旧不支持 spark-sql,spark-thriftserver。
相比略显鸡肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的选择,因此我故意移除了这两个功能。
注意:我提供的构建版,没有启用 spark-thriftserver 模块,为正常使用 Kyuubi,请下载hive-service-rpc-3.1.2.jar 添加到 /opt/spark3/jars 路径。
Kyuubi —— 解锁 Spark SQL 更多场景
简言之,Apache Kyuubi (Incubating) 之于 Spark,类似 HiveServer2 之于 Hive。
Kyuubi 通过将 Spark 暴露一个与 HiveServer2 完全兼容的 Thrift API,可以兼容现有的 Hive 生态,如 beeline,hive-jdbc-driver,HUE,Superset 等。
可以通过以下文档来了解 Kyuubi 的架构,Kyuubi 与 HiveServer2 和 Spark Thrift Server 的异同。
Welcome to Kyuubi’s documentation ④
Kyuubi Architecture — Kyuubi documentation ⑤
Kyuubi v.s. HiveServer2 — Kyuubi documentation ⑥
Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS) — Kyuubi documentation ⑦
Kyuubi 部署
Kyuubi 无需任何修改即可适配 CDH 6,下面给出关键步骤,详情可以参考 Deploy Kyuubi engines on Yarn — Kyuubi documentation ⑧
同样的,如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。
解压部署
下载 kyuubi-1.3.0-incubating-bin.tgz 解压至 /opt,并创建软链到 /opt/kyuubi。
[hive@cdh-external opt]$ ls -l /opt | grep kyuubi
lrwxrwxrwx 1 root root 32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
drwxrwxr-x 13 hive hive 4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin
修改配置
按需修改/opt/kyuubi/conf/kyuubi-env.sh
export JAVA_HOME=/usr/java/default
export SPARK_HOME=/opt/spark3
export SPARK_CONF_DIR=${SPARK_HOME}/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
export KYUUBI_MAX_LOG_FILES=10
参考Kyuubi Configurations System — Kyuubi documentation ⑨
按需修改/opt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.authentication=NONE
kyuubi.engine.share.level=USER
kyuubi.frontend.bind.host=0.0.0.0
kyuubi.frontend.bind.port=10009
kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
kyuubi.ha.zookeeper.namespace=kyuubi
kyuubi.session.engine.idle.timeout=PT10H
spark.master=yarn
spark.submit.deployMode=cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60
注意:kyuubi-defaults.conf 中的 spark 配置优先级高于 spark-defaults.conf
性能调优
How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation ⑩
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation ⑪
启动
使用/opt/kyuubi/bin/kyuubi 在前台或后台启动 Kyuubi Server。
[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
Usage: bin/kyuubi command
commands:
start - Run a Kyuubi server as a daemon
run - Run a Kyuubi server in the foreground
stop - Stop the Kyuubi daemon
status - Show status of the Kyuubi daemon
-h | --help - Show this help message
Beeline 连接
使用默认参数连接 Kyuubi
beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata
使用自定义参数连接 Kyuubi
beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch
详细配置参考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation ⑫
HUE 连接
简单说,在 Cloudera Manager 中修改 HUE 配置项 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中开启 Spark SQL 引擎。
[desktop]
app_blacklist=zookeeper,hbase,impala,search,sqoop,security
use_new_editor=true
[[interpreters]]
[[[sparksql]]]
name=Spark SQL
interface=hiveserver2
[[[hive]]]
name=Hive
interface=hiveserver2
# other interpreters
...
[spark]
sql_server_host=kyuubi
sql_server_port=10009
详细配置参考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation ⑬
Kyuubi engine 共享级别与应用场景
Kyuubi Spark engine 即一个 Spark driver,通过控制 engine 的共享策略,可以在隔离性和资源利用率上取得平衡。Kyuubi 提供 3 种 engine 共享级别,分别为 CONNECTION,USER(默认),SERVER。
下面的讨论均假设使用 YARN Cluster 模式启动 Kyuubi Spark engine。
我们首先补充一些时间开销和 Spark 操作执行的信息。
Kyuubi Spark engine 在冷启动时,会由 Kyuubi Server 通过spark-submit 命令向 YARN 提交一个 Spark App,即 engine,该过程从提交到 Spark driver 启动约需要 5-6s,然后 engine 将自己注册到 Zookeeper,Kyuubi Server 监听 Zookeeper 发现 engine 并建立连接,该过程约 1-2s,如此算来,在 YARN 资源空闲时,整个 engine 冷启动时间约 6-8s;Client 连接到存在的 engine,通常耗时 1s 内。
如果启动了 Spark 的 executor 动态伸缩特性,真正执行 SQL 任务时,如果资源有富余,会动态创建 executor,每个 executor 创建耗时约为 2-3s。
元数据、DDL 等操作,如获取 database 列表,CREATE TABLE语句等,会在 driver 上执行;计算任务如 JOIN、COUNT() 等,会由 driver 生成执行计划,在 executor 上执行。
在我们的生产环境中,大概有如下三种使用场景:
(1)使用 HUE 进行 ad-hoc 查询
该场景中,会有多个用户进行查询,一般会运行相对较大的查询任务,用户对连接创建时间以及元数据加载时间较为敏感,但对查询结果响应时间有一定的容忍性。在这种场景中,使用默认的 USER 共享级别,每个用户只使用一个 Spark engine,配合使用 Spark 的动态伸缩特性,动态的创建和销毁 executor,在保证用户之间隔离的基础上,降低启动时间和资源占用。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT1H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120
(2)使用 Beeline 运行批任务
SQL 该场景会使用统一的 batch 账号提交 SQL 任务,对响应时间敏感度低,但对稳定性要求非常要,并且应尽可能的最大化利用集群资源。推荐在该场景下将共享级别调整为 CONNECTION,这样每个 SQL 执行将会使用独立的 Spark driver,并且 SQL 执行完毕后 Spark driver 立即退出,保障离线任务互不干扰,并且资源及时释放。建议的关键配置如下:
kyuubi.engine.share.level=CONNECTION
spark.dynamicAllocation.enabled=true
# 根据任务具体资源消耗估算,从 workflow 整体上提升集群资源利用率
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=30
(3)使用 Superset 进行多数据源联邦查询
该场景中,只有一个 service 账号,会定时同时刷新大量的图表,对连接创建时间、查询结果响应时间、并发度都有较高的要求。该场景中,driver 和 executor 的启动时间都是不容忽视的,因此在 ad-hoc 查询配置的基础上,应延长 driver、executor 的闲置等待时间,并且设定最小的 executor 保活数量,保证时刻有 executor 常驻,能快速响应较小的查询。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=6
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.executorIdleTimeout=120
我们可以启动三个 Kyuubi Server(单机或集群)来分别应对如上的三种场景,但也可以仅启动一个 Kyuubi Server(单机或集群)来满足不同的场景。
一种建议的实践方式是 Kyuubi Server 配置使用默认的 USER 共享级别,这样客户端连接时,会使用默认配置;当 ETL 跑批场景时,可以通过 beeline 参数将本次连接共享级别调整为 CONNECTION;类似的,在 Superset 连接中配置独立参数。
结语
现在去跑一些 SQL,体验 Spark SQL 带来的性能飞跃吧!
参考链接:
①https://github.com/pan3793/cdh-hive/releases/tag/v2.1.1-cdh6.3.1-fix
②https://spark.apache.org/docs/latest/core-migration-guide.html
③https://spark.apache.org/docs/latest/sql-migration-guide.html
④https://kyuubi.readthedocs.io/en/latest/index.html
⑤https://kyuubi.apache.org/docs/latest/overview/architecture.html
⑥https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_hive.html
⑦https://kyuubi.apache.org/docs/latest/overview/kyuubi_vs_thriftserver.html
⑧https://kyuubi.apache.org/docs/latest/deployment/on_yarn.html
⑨https://kyuubi.apache.org/docs/latest/deployment/settings.html
⑩https://kyuubi.apache.org/docs/latest/deployment/spark/dynamic_allocation.html
⑪https://kyuubi.apache.org/docs/latest/deployment/spark/aqe.html
⑫https://kyuubi.apache.org/docs/latest/client/hive_jdbc.html
⑬https://kyuubi.apache.org/docs/latest/quick_start/quick_start_with_hue.html#for-cdh-6-x-users
潘成,来自 Apache Kyuubi 社区
赠书福利