Hive、SparkSQL是如何决定写文件的数量的?
1. Hive
1.1 without shuffle
Hive在通过SQL写文件是通过MapReduce任务完成的,如下面这个例子:
hive> insert into table temp.czc_hive_test_write values ('col1_value', 1),('col1_value', 2);
在表中插入数据后,可以hdfs对应路径下找到存储的文件
$ hadoop fs -ls /user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12
Found 2 items
-rwxrwxrwx 3 hadoop supergroup 26 2019-12-20 15:56 hdfs://sdg/user/hive/warehouse/temp.db/czc_hive_test_write/part_date=2018-12-12/000000_0
可以看到插入生成了1个文件,这是因为每一条插入语句都会单独启动一个MapReduce任务,一个MapReduce任务对应一个结果文件。
1.2 with shuffle
当插入过程有shuffle时:
hive> insert into table temp.czc_hive_game select count(*), game_id from temp.source_table group by game_id;
...
Hadoop job information for Stage-1: number of mappers: 62; number of reducers: 1
...
由Hive实现group by的过程可知,group by的时候会以group by
的字段为key进行shuffle,即上例中的game_id字段。从执行日志中可以看到整个任务启用了62个mapper和1个reducer,由于最终写数据的过程是在reducer中完成,所以最终写数据的文件数量也应该只有1个。
$ hadoop fs -ls /user/hive/warehouse/temp.db/czc_hive_game
Found 1 items
-rwxrwxrwx 3 hadoop supergroup 268 2019-12-20 16:31 /user/hive/warehouse/temp.db/czc_hive_game/000000_0
注:Hive控制reducer数量的规则如下:
Hive自己如何确定reduce数:
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1G)
hive.exec.reducers.max
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
Spark SQL
2.1 without shuffle
Spark SQL也可以在hive中操作文件,执行命令
spark.sql("insert into table temp.czc_spark_test_write values ('col1_value', 1),('col1_value', 2)")
Hdfs中文件的存储如下:
$ hadoop fs -ls /user/hive/warehouse/temp.db/czc_spark_test_write
Found 2 items
-rwxrwxrwx 3 hadoop supergroup 13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
-rwxrwxrwx 3 hadoop supergroup 13 2019-12-20 17:01 /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
$ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00000-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
col1_value 1
$ hadoop fs -cat /user/hive/warehouse/temp.db/czc_spark_test_write/part-00001-0db5ce49-fb85-4c15-bac8-fa2213a03203-c000
col1_value 2
可以发现即使是同一条语句,spark也会启动两个任务区并行的写文件,最终产生了两个文件结果。
2.2 with shuffle
Spark中同样以类似的SQL为例:
scala> spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_table group by game_id");
res1: org.apache.spark.sql.DataFrame = []
与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例中的key game_id
在源数据source_table的分布情况是共有26个不同的key。
hive> select count(distinct game_id) from temp.source_table;
OK
26
因此spark会启动26个任务来写数据,在最终的结果文件中也应该有26个文件:
$ hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game
Found 26 items
-rwxrwxrwx 3 hadoop supergroup 0 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00007-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 7 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00010-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00011-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00012-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00032-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00036-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00043-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00048-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 24 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00065-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00066-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00083-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00086-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 16 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00101-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00102-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00105-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 14 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00111-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 12 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00123-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00124-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00136-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00162-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00163-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 10 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00165-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 8 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00174-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 17 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00176-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
-rwxrwxrwx 3 hadoop supergroup 9 2019-12-20 14:46 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00199-2f2213b2-a5b7-4c20-83da-dd25c4b018b9-c000
2.3 解决小文件问题
由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)中支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。
scala> spark.sql("set spark.sql.adaptive.enabled=true")
scala> spark.sql("insert into table temp.czc_spark_game select count(*), game_id from temp.source_table group by game_id")
在将spark.sql.adaptive.enabled属性设置为true后,spark写文件的结果为
$ hadoop fs -ls hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game
Found 1 items
-rwxr-xr-x 3 hadoop supergroup 268 2019-12-20 20:55 hdfs://sdg/user/hive/warehouse/temp.db/czc_spark_game/part-00000-a293e3b3-3136-4f57-bf66-f0ee2d4f8dbb-c000
从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。动态规划的细节请参考Adaptive Execution 让 Spark SQL 更高效更智能。