Spark 实践 | Spark SQL TPC-H Cookbook
版本号
•Apache Spark 3.1.1 / 2.4.7•Apache Hadoop 2.7.7•TiSpark 2.3.13
环境
内网 3 台机器,每台 40 核 189 GB 内存 NVMe 3TB 磁盘,相互之间已配置好网络连接。
测试逻辑
Standalone 部署的 Spark SQL 读取 HDFS 上以 Parquet 格式存储的 TPC-H 100 数据,随后执行 Query 并计算 Query 耗时。弱依赖 TiSpark 完成数据导入,也可采用其他数据导入方式。强依赖 spark-sql-perl[1] 提供的执行框架。
测试步骤
搭建 HDFS 集群
下载 Hadoop 2.7.7 压缩包并解压,随后在启动脚本中设置环境变量,配置完成后重新进入 Shell 或通过 source
命令使其生效。
export HADOOP_HOME=/path/to/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin
这个操作需要在三台机器上都执行,不确定是否需要同一个路径,但是同一个路径是工作的。
配置 $HADOOP_HOME/etc/hadoop
目录下的 core-site.xml
,hdfs-site.xml
和 slaves
文件,本次性能测试不依赖 YARN 及 MapReduce 组件。
对于 core-site.xml
添加以下配置,根据具体环境相应调整 hostname 和文件路径。
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://h59:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///root/tison/data/hadoop/tmp</value>
</property>
</configuration>
对于 hdfs-site.xml
添加以下配置,同样根据具体环境微调配置值。
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///root/tison/data/hadoop/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///data2/tison/data/hadoop/data,file:///data1/tison/data/hadoop/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>h85:9001</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
其中 dfs.datanode.data.dir
是一个逗号分隔的文件路径列表,可以只写一个文件路径。配置多个路径时 HDFS 会尝试均匀的写入数据。
对于 slaves
文件,填写按行分隔的 datanote 节点 hostname 信息。
h59
h82
h85
同样,这份配置在所有机器上应该是一致的。配置完成后,所有机器根据配置为 namenode 或 datanode 或两者都是,根据配置值创建相应的数据目录。
以上配置完成后,在 namenode 机器上执行以下命令格式化文件系统。
$ hdfs namenode -format
格式化完成后,在 namenode 机器上执行以下脚本启动 HDFS 集群。
$ $HADOOP_HOME/sbin/start-dfs.sh
启动完成后,在浏览器中浏览 http://h59:50070/
查看集群状态的管控页面,注意网址微调为具体部署的 hostname 或 ip 地址。
启动 Spark 集群
下载 Spark 3.1.1 压缩包并解压,Spark 的执行脚本会智能获取路径信息,通常不需要配置路径信息,但是要求所有节点的 Spark 目录相同。
对于 Standalone 部署,需要配置其他 Worker 节点信息,修改 conf/workers
文件,对于 2.4.7 版本,修改 conf/slaves
文件。
h59
h82
h85
此时即可通过执行以下脚本启动一个 Spark 集群。
$ sbin/start-all.sh
随后,可以通过 Spark Shell 连上集群进行测试。
bin/spark-shell
...
"SHOW DATABASES").show spark.sql(
+------------+
|databaseName|
+------------+
| default|
+------------+
scala>
准备 TPC-H 数据
我们需要在 HDFS 上准备 Parquet 形式的 TPC-H 100 数据。
这个过程有多种做法,我们假设 TiDB 集群已有相应数据,这里介绍使用 TiSpark 导入数据的方式。
TiSpark 的工作方式是通过 Spark Extension 机制改变 Spark SQL 的 Plan 来实现的,我们需要按照 Spark Extension 的机制来配置。
首先需要从 TiSpark 的发布页面[2]下载 2.3.13 的压缩包并解压。
目前,TiSpark 2.3.13 支持 Spark 2.3.x 和 2.4.x 版本,注意保证当前运行的 Spark 集群是相应版本,3.1.1 暂不兼容。
随后启动 Spark Shell 并注意加载 TiSpark 的 jar 包,其中 spark.tispark.pd.addresses
是 TiDB 集群中 PD 集群的通信地址列表。
$ bin/spark-shell --jars /path/to/tispark-assembly-2.3.13.jar --conf spark.sql.extensions=org.apache.spark.sql.TiExtensions --conf spark.tispark.pd.addresses=<host>:<port>,<host>:<port>,<host>:<port>
...
scala> sql("SHOW DATABASES").show
+------------+
|databaseName|
+------------+
| default|
| tpch_100|
| test|
| mysql|
+------------+
scala>
直接从 TiDB 中写出 TPC-H 100 的数据到 HDFS 集群上。
scala> spark.sql("use tpch_100")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|tpch_100| customer| false|
|tpch_100| lineitem| false|
|tpch_100| nation| false|
|tpch_100| orders| false|
|tpch_100| part| false|
|tpch_100| partsupp| false|
|tpch_100| region| false|
|tpch_100| supplier| false|
+--------+---------+-----------+
scala> spark.sql("SELECT * FROM tpch_100.nation").write.parquet("hdfs://h59:9000/tison/tpch100/nation.parquet")
scala> spark.sql("SELECT * FROM tpch_100.region").write.parquet("hdfs://h59:9000/tison/tpch100/region.parquet")
scala> spark.sql("SELECT * FROM tpch_100.customer").write.parquet("hdfs://h59:9000/tison/tpch100/customer.parquet")
scala> spark.sql("SELECT * FROM tpch_100.part").write.parquet("hdfs://h59:9000/tison/tpch100/part.parquet")
scala> spark.sql("SELECT * FROM tpch_100.partsupp").write.parquet("hdfs://h59:9000/tison/tpch100/partsupp.parquet")
scala> spark.sql("SELECT * FROM tpch_100.supplier").write.parquet("hdfs://h59:9000/tison/tpch100/supplier.parquet")
scala> spark.sql("SELECT * FROM tpch_100.orders").write.parquet("hdfs://h59:9000/tison/tpch100/orders.parquet")
scala> spark.sql("SELECT * FROM tpch_100.lineitem").write.parquet("hdfs://h59:9000/tison/tpch100/lineitem.parquet")
可以从 HDFS 的管控页面上看到数据已经导入。
执行 TPC-H 测试
从 Spark Shell 中创建 TPC-H 测试需要的外部表,注意此时不要装载 TiSpark 扩展。
"CREATE DATABASE tpch_100") sql(
"SHOW DATABASES").show sql(
"USE tpch_100") sql(
"nation", "hdfs://h59:9000/tison/tpch100/nation.parquet", "parquet") spark.sqlContext.createExternalTable(
"region", "hdfs://h59:9000/tison/tpch100/region.parquet", "parquet") spark.sqlContext.createExternalTable(
"customer", "hdfs://h59:9000/tison/tpch100/customer.parquet", "parquet") spark.sqlContext.createExternalTable(
"orders", "hdfs://h59:9000/tison/tpch100/orders.parquet", "parquet") spark.sqlContext.createExternalTable(
"part", "hdfs://h59:9000/tison/tpch100/part.parquet", "parquet") spark.sqlContext.createExternalTable(
"partsupp", "hdfs://h59:9000/tison/tpch100/partsupp.parquet", "parquet") spark.sqlContext.createExternalTable(
"supplier", "hdfs://h59:9000/tison/tpch100/supplier.parquet", "parquet") spark.sqlContext.createExternalTable(
"lineitem", "hdfs://h59:9000/tison/tpch100/lineitem.parquet", "parquet") spark.sqlContext.createExternalTable(
从 databricks 的 spark-sql-perf 代码仓库[3]里下载并编译,随后在启动 Saprk Shell 时加入编译出来的 jar 包依赖。
从 spark-sql-perf 代码仓库拷贝 src/main/notebooks/tpch_run.scala
文件,这个是执行 TPC-H 性能测试的执行脚本,我们简单调整一些配置来跑上面流程执行下来的 TPC-H 100 环境下的测试。
•scaleFactors
变量改为 Seq(100)
,我们只测 100G 量级的数据。•databaseName
方法我们固定返回 tpch_100
作为数据库名。•queryContent
加载 Query 可以考虑改为本地加载 SQL 文件,保证不同产品的测试 SQL 一致。也可在打包 spark-sql-perf 时改动 resource 文件的内容,但不太灵活。•perfDatasetsLocation
是个废方法,可以删除。•其他变量例如 resultLocation
/ iterations
根据需要相应调整。
调整完成后,从 Spark Shell 中加载脚本执行并确认结果。
scala> :load /path/to/tpch_run.scala
在启动 Spark Shell 时,可以通过调整执行参数来调优结果,下面是一个参考配置。此外,可以考虑修改应用参数 spark.local.dir
指定 Shuffle / Spill 目录到 NVMe 盘上,而不是默认的 /tmp
目录。
$ bin/spark-shell --master spark://h59:7077 \
--driver-memory 20G \
--total-executor-cores 120 \
--executor-cores 5 \
--executor-memory 15G
References
[1]
spark-sql-perl: https://github.com/databricks/spark-sql-perf[2]
发布页面: https://github.com/pingcap/tispark/releases/tag/v2.3.13[3]
代码仓库: https://github.com/databricks/spark-sql-perf