Spark 实践 | Spark SQL TPC-H Cookbook

共 7738字,需浏览 16分钟

 ·

2021-03-26 05:40

版本号

Apache Spark 3.1.1 / 2.4.7Apache Hadoop 2.7.7TiSpark 2.3.13

环境

内网 3 台机器,每台 40 核 189 GB 内存 NVMe 3TB 磁盘,相互之间已配置好网络连接。

测试逻辑

Standalone 部署的 Spark SQL 读取 HDFS 上以 Parquet 格式存储的 TPC-H 100 数据,随后执行 Query 并计算 Query 耗时。弱依赖 TiSpark 完成数据导入,也可采用其他数据导入方式。强依赖 spark-sql-perl[1] 提供的执行框架。

测试步骤

搭建 HDFS 集群

下载 Hadoop 2.7.7 压缩包并解压,随后在启动脚本中设置环境变量,配置完成后重新进入 Shell 或通过 source 命令使其生效。

export HADOOP_HOME=/path/to/hadoop-2.7.7export PATH=$PATH:$HADOOP_HOME/bin

这个操作需要在三台机器上都执行,不确定是否需要同一个路径,但是同一个路径是工作的。

配置 $HADOOP_HOME/etc/hadoop 目录下的 core-site.xmlhdfs-site.xml 和 slaves 文件,本次性能测试不依赖 YARN 及 MapReduce 组件。

对于 core-site.xml 添加以下配置,根据具体环境相应调整 hostname 和文件路径。

<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration>  <property>    <name>fs.defaultFS</name>    <value>hdfs://h59:9000</value>  </property>  <property>      <name>hadoop.tmp.dir</name>      <value>file:///root/tison/data/hadoop/tmp</value>   </property></configuration>

对于 hdfs-site.xml 添加以下配置,同样根据具体环境微调配置值。

<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration>    <property>        <name>dfs.namenode.name.dir</name>        <value>file:///root/tison/data/hadoop/hdfs/name</value>    </property>
<property> <name>dfs.datanode.data.dir</name> <value>file:///data2/tison/data/hadoop/data,file:///data1/tison/data/hadoop/data</value> </property>
<property> <name>dfs.replication</name> <value>3</value> </property>
<property> <name>dfs.namenode.secondary.http-address</name> <value>h85:9001</value> </property> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property></configuration>

其中 dfs.datanode.data.dir 是一个逗号分隔的文件路径列表,可以只写一个文件路径。配置多个路径时 HDFS 会尝试均匀的写入数据。

对于 slaves 文件,填写按行分隔的 datanote 节点 hostname 信息。

h59h82h85

同样,这份配置在所有机器上应该是一致的。配置完成后,所有机器根据配置为 namenode 或 datanode 或两者都是,根据配置值创建相应的数据目录。

以上配置完成后,在 namenode 机器上执行以下命令格式化文件系统。

$ hdfs namenode -format

格式化完成后,在 namenode 机器上执行以下脚本启动 HDFS 集群。

$ $HADOOP_HOME/sbin/start-dfs.sh

启动完成后,在浏览器中浏览 http://h59:50070/ 查看集群状态的管控页面,注意网址微调为具体部署的 hostname 或 ip 地址。

启动 Spark 集群

下载 Spark 3.1.1 压缩包并解压,Spark 的执行脚本会智能获取路径信息,通常不需要配置路径信息,但是要求所有节点的 Spark 目录相同。

对于 Standalone 部署,需要配置其他 Worker 节点信息,修改 conf/workers 文件,对于 2.4.7 版本,修改 conf/slaves 文件。

h59h82h85

此时即可通过执行以下脚本启动一个 Spark 集群。

$ sbin/start-all.sh

随后,可以通过 Spark Shell 连上集群进行测试。

$ bin/spark-shell...scala> spark.sql("SHOW DATABASES").show+------------+|databaseName|+------------+|     default|+------------+

scala>

准备 TPC-H 数据

我们需要在 HDFS 上准备 Parquet 形式的 TPC-H 100 数据。

这个过程有多种做法,我们假设 TiDB 集群已有相应数据,这里介绍使用 TiSpark 导入数据的方式。

TiSpark 的工作方式是通过 Spark Extension 机制改变 Spark SQL 的 Plan 来实现的,我们需要按照 Spark Extension 的机制来配置。

首先需要从 TiSpark 的发布页面[2]下载 2.3.13 的压缩包并解压。

目前,TiSpark 2.3.13 支持 Spark 2.3.x 和 2.4.x 版本,注意保证当前运行的 Spark 集群是相应版本,3.1.1 暂不兼容。

随后启动 Spark Shell 并注意加载 TiSpark 的 jar 包,其中 spark.tispark.pd.addresses 是 TiDB 集群中 PD 集群的通信地址列表。

$ bin/spark-shell --jars /path/to/tispark-assembly-2.3.13.jar --conf spark.sql.extensions=org.apache.spark.sql.TiExtensions --conf spark.tispark.pd.addresses=<host>:<port>,<host>:<port>,<host>:<port>...scala> sql("SHOW DATABASES").show+------------+|databaseName|+------------+|     default||    tpch_100||        test||       mysql|+------------+

scala>

直接从 TiDB 中写出 TPC-H 100 的数据到 HDFS 集群上。

scala> spark.sql("use tpch_100")res0: org.apache.spark.sql.DataFrame = []scala> spark.sql("show tables").show+--------+---------+-----------+|database|tableName|isTemporary|+--------+---------+-----------+|tpch_100| customer|      false||tpch_100| lineitem|      false||tpch_100|   nation|      false||tpch_100|   orders|      false||tpch_100|     part|      false||tpch_100| partsupp|      false||tpch_100|   region|      false||tpch_100| supplier|      false|+--------+---------+-----------+
scala> spark.sql("SELECT * FROM tpch_100.nation").write.parquet("hdfs://h59:9000/tison/tpch100/nation.parquet")scala> spark.sql("SELECT * FROM tpch_100.region").write.parquet("hdfs://h59:9000/tison/tpch100/region.parquet")scala> spark.sql("SELECT * FROM tpch_100.customer").write.parquet("hdfs://h59:9000/tison/tpch100/customer.parquet")scala> spark.sql("SELECT * FROM tpch_100.part").write.parquet("hdfs://h59:9000/tison/tpch100/part.parquet")scala> spark.sql("SELECT * FROM tpch_100.partsupp").write.parquet("hdfs://h59:9000/tison/tpch100/partsupp.parquet")scala> spark.sql("SELECT * FROM tpch_100.supplier").write.parquet("hdfs://h59:9000/tison/tpch100/supplier.parquet")scala> spark.sql("SELECT * FROM tpch_100.orders").write.parquet("hdfs://h59:9000/tison/tpch100/orders.parquet")scala> spark.sql("SELECT * FROM tpch_100.lineitem").write.parquet("hdfs://h59:9000/tison/tpch100/lineitem.parquet")

可以从 HDFS 的管控页面上看到数据已经导入。

执行 TPC-H 测试

从 Spark Shell 中创建 TPC-H 测试需要的外部表,注意此时不要装载 TiSpark 扩展。

scala> sql("CREATE DATABASE tpch_100")scala> sql("SHOW DATABASES").showscala> sql("USE tpch_100")scala> spark.sqlContext.createExternalTable("nation", "hdfs://h59:9000/tison/tpch100/nation.parquet", "parquet")scala> spark.sqlContext.createExternalTable("region", "hdfs://h59:9000/tison/tpch100/region.parquet", "parquet")scala> spark.sqlContext.createExternalTable("customer", "hdfs://h59:9000/tison/tpch100/customer.parquet", "parquet")scala> spark.sqlContext.createExternalTable("orders", "hdfs://h59:9000/tison/tpch100/orders.parquet", "parquet")scala> spark.sqlContext.createExternalTable("part", "hdfs://h59:9000/tison/tpch100/part.parquet", "parquet")scala> spark.sqlContext.createExternalTable("partsupp", "hdfs://h59:9000/tison/tpch100/partsupp.parquet", "parquet")scala> spark.sqlContext.createExternalTable("supplier", "hdfs://h59:9000/tison/tpch100/supplier.parquet", "parquet")scala> spark.sqlContext.createExternalTable("lineitem""hdfs://h59:9000/tison/tpch100/lineitem.parquet""parquet")

从 databricks 的 spark-sql-perf 代码仓库[3]里下载并编译,随后在启动 Saprk Shell 时加入编译出来的 jar 包依赖。

从 spark-sql-perf 代码仓库拷贝 src/main/notebooks/tpch_run.scala 文件,这个是执行 TPC-H 性能测试的执行脚本,我们简单调整一些配置来跑上面流程执行下来的 TPC-H 100 环境下的测试。

scaleFactors 变量改为 Seq(100),我们只测 100G 量级的数据。databaseName 方法我们固定返回 tpch_100 作为数据库名。queryContent 加载 Query 可以考虑改为本地加载 SQL 文件,保证不同产品的测试 SQL 一致。也可在打包 spark-sql-perf 时改动 resource 文件的内容,但不太灵活。perfDatasetsLocation 是个废方法,可以删除。其他变量例如 resultLocation / iterations 根据需要相应调整。

调整完成后,从 Spark Shell 中加载脚本执行并确认结果。

scala> :load /path/to/tpch_run.scala

在启动 Spark Shell 时,可以通过调整执行参数来调优结果,下面是一个参考配置。此外,可以考虑修改应用参数 spark.local.dir 指定 Shuffle / Spill 目录到 NVMe 盘上,而不是默认的 /tmp 目录。

$ bin/spark-shell --master spark://h59:7077 \--driver-memory 20G \--total-executor-cores 120 \--executor-cores 5 \--executor-memory 15G

References

[1] spark-sql-perl: https://github.com/databricks/spark-sql-perf
[2] 发布页面: https://github.com/pingcap/tispark/releases/tag/v2.3.13
[3] 代码仓库: https://github.com/databricks/spark-sql-perf

浏览 91
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报