Hudi 指南 | Apache Hudi 测试运维权威指南
说明:本文档是基于Kerberized BEH-7.6及Spark-2.4.4
1. 生产测试数据
1.1 数据源
1.1.1 Hudi官方的测试数据
https://github.com/apache/hudi/tree/master/docker/demo/data
1.1.2 利用Hive的TPC-DS数据改造
利用TPC-DS的数据改造,具体查看GitLab:
https://code.bonc.com.cn/bdcut/hive-testbench/snippets/10
部分数据如下:
[hadoop@hadoop01 ~]$ head tpc-ds.json
{"customer_id":"AAAAAAAAOKKNEKBA","shop_date":"1999/09/25","sum_cost":"140.33000069856644"}
{"customer_id":"AAAAAAAAPNAJLMAA","shop_date":"2002/12/26","sum_cost":"29.320000052452087"}
{"customer_id":"AAAAAAAAHNPHCLBA","shop_date":"1999/07/18","sum_cost":"45.949999272823334"}
{"customer_id":"AAAAAAAAANHGIPAA","shop_date":"1998/10/04","sum_cost":"21.369999915361404"}
{"customer_id":"AAAAAAAAPAHKHIBA","shop_date":"2001/12/13","sum_cost":"58.009999826550484"}
{"customer_id":"AAAAAAAABNJBBABA","shop_date":"1998/11/06","sum_cost":"205.01999327540398"}
{"customer_id":"AAAAAAAAILDDDKBA","shop_date":"1998/02/15","sum_cost":"107.06000108271837"}
{"customer_id":"AAAAAAAAHFJMPEAA","shop_date":"1999/09/29","sum_cost":"43.04000025987625"}
{"customer_id":"AAAAAAAAIBOOIHAA","shop_date":"2000/10/16","sum_cost":"122.53999684005976"}
{"customer_id":"AAAAAAAAGEHOPKBA","shop_date":"2001/09/13","sum_cost":"73.4099994301796"}
1.2 将测试数据导入Kafka
1.2.1 利用kafkacat工具
1.2.1.1 接收
项目地址:https://github.com/edenhill/kafkacat
Kafkacat可以认为是kafka工具中的[netcat],是通用的非JVM生产消费工具 Apache Kafka >= 0.8。
1.2.1.2 非kerberized Kafka
1. 生产数据
cat batch_1.json | kafkacat -b 172.16.13.116:9092-t stock_ticks -P
2. 查看topic
kafkacat -L -b 172.16.13.116:9092-t stock_ticks
3. 查看topic元数据
kafkacat -b 172.16.13.116:9092-L -J | jq
1.2.1.3 Kerberized Kafka
1. 创建topic
kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks
2. 编辑kafkacat.conf文件,通过-F指定配置文件,通过kafkacat –X list查看可配置的参数
bootstrap.servers=bdev001.bonc.com:9092,bdev002.bonc.com:9092##可选
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafka@BONC.COM
sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab
3. 生产数据
cat batch.txt | kafkacat \
-F kafkacat.conf \
-b hadoop02.bonc.com:9092 \
-t stock-ticks \
-P
或者
cat batch.txt | kafkacat \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=GSSAPI \
-X sasl.kerberos.service.name=kafka \
-X sasl.kerberos.principal=kafka@BONC.COM \
-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \
-b hadoop02.bonc.com:9092 \
-t stock-ticks \
-P
4. 查看topic
kafkacat -L \
-b hadoop02.bonc.com:9092 \
-F kafkacat.conf \
-t stock-ticks
5. 查看topic元数据
kafkacat \
-b hadoop02.bonc.com:9092 \
-F kafkacat.conf \
-L -J | jq
1.2.2 利用kafka自带的测试工具
Kafka自带的生产性能测试工具kafka-producer-perf-test.sh可以用来加载数据。
### 创建topic
kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks
kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic store-sales
### 生产Hudi自带的数据
kafka-producer-perf-test.sh --topic stock-ticks --throughput 1000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file batch_1.json --num-records 3482
### 生产改造的数据
kafka-producer-perf-test.sh --topic store-sales --throughput 100000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file store_sales.json --num-records 479456
注:
--payload-delimiter —— 指定分隔符,默认是新行\n
--throughput —— Required, 将吞吐量(msg/s)调整到大约等于该值,如果该值很大,将达不到
--num-records —— *指定生产的数据量(msgs***),这个自然要小于等于文件中的records数。
1.3 写入HDFS
1.3.1 stock_ticks
1. 编辑kafka-source.properties
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
# schema provider configs
#schema.registry.url=http://localhost:8081
#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
hoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc
# Kafka Source
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=stock-ticks
#Kafka props
bootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092
auto.offset.reset=earliest
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
2. base.properties
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false
3. schema.avsc
{
"type":"record",
"name":"stock_ticks",
"fields":[{
"name": "volume",
"type": "long"
}, {
"name": "ts",
"type": "string"
}, {
"name": "symbol",
"type": "string"
},{
"name": "year",
"type": "int"
},{
"name": "month",
"type": "string"
},{
"name": "high",
"type": "double"
},{
"name": "low",
"type": "double"
},{
"name": "key",
"type": "string"
},{
"name": "date",
"type":"string"
}, {
"name": "close",
"type": "double"
}, {
"name": "open",
"type": "double"
}, {
"name": "day",
"type":"string"
}
]}
4. 写出HDFS-Copy On Write类型
### local 模式
spark-submit \
--master local[2] \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow \
--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
### Spark On Yarn模式
spark-submit \
--master yarn \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow \
--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
5. 写出HDFS——Merge On Read类型
### Local模式
spark-submit \
--master local[2] \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction
### Spark On Yarn模式
spark-submit \
--master yarn \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction
1.3.2 store_sales
1. 编辑kafka-source.properties
include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field= customer_id
hoodie.datasource.write.partitionpath.field= shop_date
# schema provider configs
#schema.registry.url=http://localhost:8081
#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
hoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc
# Kafka Source
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic= store-sales-1
#Kafka props
bootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092
auto.offset.reset=earliest
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
2. Base.properties
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false
3. schema.avsc
{
"type":"record",
"name":"store_sales",
"fields":[{
"name": "customer_id",
"type": "string"
}, {
"name": "shop_date",
"type": "string"
}, {
"name": "sum_cost",
"type": "double"
}
]}
1.4 将hudi数据同步到Hive metastore
1. 前提搭建好hive环境
2. 从build的hoodie的hudi-hive模块获取run_sync_tool.sh文件
${Hoodie-Build}/hudi-hive/run_sync_tool.sh
3. 调整run_sync_tool.sh
[hadoop@hadoop02 tools]$ git diff run_sync_tool.sh.template run_sync_tool.sh
diff --git a/run_sync_tool.sh.template b/run_sync_tool.sh
index 42d2b9a..66c8180100755
--- a/run_sync_tool.sh.template
+++ b/run_sync_tool.sh
@@ -47,9+47,11@@ if[ -z "${HIVE_JDBC}"]; then
HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`
fi
HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`
-HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON
+# HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON
+HIVE_JARS=`ls ${HIVE_HOME}/lib/*.jar | tr '\n' ':'`
-HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/
+# HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/li
+HADOOP_HIVE_JARS=${HIVE_JARS}:${HIVE_HOME}/conf:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*
echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HUDI_HIVE_UBER_JAR org.apache.hudi.hive.HiveSyncTool $@"
java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} org.apache.hudi.hive.HiveSyncTool"$@"
4. 将hudi-utilities-*.jar放在${HIVE_HOME}/lib目录下,并重启Hive服务
5. 初始化票据,并创建hudi_stock数据库
$ kinit hadoop
$ beeline –u “jdbc:hive2://hadoop01.bonc.com:10000/;principal=hs2/hadoop01.bonc.com@BONC.COM”
6. 同步1.3节写入HDFS的stock_ticks_cow
./run_sync_tool.sh \
--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \
--user hadoop \
--pass hadoop \
--partitioned-by dt \
--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \
--database hudi_stock \
--table stock_ticks_cow
注:
1、执行需要kinit缓存票据
2、即使使用缓存—user –pass也必须指定,脚本定义
7. 同步1.3节写入HDFS的stock_ticks_mor
./run_sync_tool.sh \
--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \
--user hadoop \
--pass hadoop \
--partitioned-by dt \
--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \
--database hudi_stock \
--table stock_ticks_mor
8. 查看同步结果
0: jdbc:hive2://hadoop02.bonc.com:10000/> show tables from hudi_stock;
+---------------------+
| tab_name |
+---------------------+
| stock_ticks_cow |
| stock_ticks_mor_ro |
| stock_ticks_mor_rt |
+---------------------+
说明:
表stock_ticks_cow由步骤6产生,支持基于HoodieParquetInputFormat的snapshot query和incremental query;
表stock_ticks_mor_rt和stock_ticks_mor_ro由步骤7产生;
其中表stock_ticks_mor_rt支持基于HoodieParquetRealtimeInputFormat的snapshot query和incremental query;表stock_ticks_mor_ro支持基于HoodieParquetInputFormat的read optimized query。
1.5 Query
1.5.1 Run Hive Queries
前提条件:需要处理的问题如下
1. 调整hive-site.xml的配置,并重启hive服务
### 修改配置,默认值是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
<property>
<name>hive.input.format</name>
<value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
</property>
2. 指定参数set hive.fetch.task.conversion=none;
3. 将hudi-utilities-*.jar拷贝到${HIVE_HOME}/lib目录下
4. 另外将hudi-hadoop-mr-*.jar分发到${HADOOP_HOME}/share/hadoop/common/及${HIVE_HOME}/lib/目录下。
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none
show tables;
show partitions stock_ticks_mor_rt;
### Copy_On_Write Queries
select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
### Merge_On_Read Queries
# Read Optimized Query,查询最新的时间戳
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
# Snapshot Query 查询最新的时间戳
select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';
#
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
注:实际上以上查询主要是针对各种类型(三种)的表执行两种查询
第一种:查看最新的时间戳
第二种:查询当前数据的部分投影,及部分数据
1.5.2 Run Spark-SQL Queries
运行Spark-SQL查询时,将hudi-spark-*.jar包拷贝至${SPARK_HOME}/jars目录下,同时间接依赖hive query中介绍的hudi-hadoop-mr-*.jar(放置到集群中hadoop/hive安装环境下)。
hudi-hadoop-mr-*.jar和RecordReader相关
./bin/spark-sql \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar
Sql:
show databases;
use hudi_stock;
select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
./bin/spark-shell \
--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--driver-class-path /opt/beh/core/hive/conf \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1
Sql:
spark.sql("show databases").show(100, false)
spark.sql("show tables from hudi_stock").show(100, false)
spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_cow where symbol = 'GOOG'").show(100, false)
spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
:quit
1.5.3 Run Presto Queries(降低Presto版本成功)
1. 将hudi-presto-*.jar分发到${PRESTO_HOME}/plugin/hive-hadoop2/目录下,并重启Presto。
2. 调整${PRESTO_HOME}/etc/catalog/hive.properties,添加yarn-site的配置文件
hive.config.resources=/opt/beh/core/hadoop/etc/hadoop/core-site.xml,/opt/beh/core/hadoop/etc/hadoop/hdfs-site.xml,/opt/beh/core/hadoop/etc/hadoop/yarn-site.xml
./bin/presto \
--server https://hadoop01.bonc.com:7778 \
--krb5-config-path /etc/krb5.conf \
--krb5-principal hadoop@BONC.COM \
--krb5-keytab-path /opt/beh/metadata/key/presto.keytab \
--krb5-remote-service-name presto \
--keystore-path /opt/beh/metadata/key/hadoop.keystore \
--keystore-password hadoop \
--catalog hive \
--schema hudi_stock
### Copy_On_Write
select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';
select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
### Merge_On_Read_RO
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
注:Presto不支持增量查询,只支持HoodieParquetInputFormat格式的查询即Copy_On_Write上的snapshot query以及Merge_On_Read_RO上的read optimized query。
异常终止:版本不一致导致
2020-05-28T18:00:07.263+0800 WARN hive-hive-0 io.prestosql.plugin.hive.util.ResumableTasks ResumableTask completed exceptionally
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/util/YarnClientUtils
at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:58)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:216)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.util.YarnClientUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
1.6 写入第二批数据
1.6.1 Hoodie自带的数据
1. 利用kafkacat导入Kafka
cat /opt/beh/core/hudi/data/batch_2.json | kafkacat \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=GSSAPI \
-X sasl.kerberos.service.name=kafka \
-X sasl.kerberos.principal=kafka@BONC.COM \
-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \
-b hadoop02.bonc.com:9092 \
-t stock-ticks \
-P
2. 写入HDFS
### Copy_On_Write
spark-submit \
--master yarn \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow \
--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
### Merge_On_Read
spark-submit \
--master yarn \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction
3. 查看导入结果
### Copy_On_Read
[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31
Found3 items
-rw------- 3 hadoop hadoop 932020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata
-rw------- 3 hadoop hadoop 4437892020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-21_20200530184035.parquet
-rw------- 3 hadoop hadoop 4435182020-06-0109:44/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-24_20200601094404.parquet
### Merge_On_Read
[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31
Found3 items
-rw------- 3 hadoop hadoop 216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24
-rw------- 3 hadoop hadoop 932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata
-rw------- 3 hadoop hadoop 4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet
注:可以看到Copy_On_Read表,直接写出到Columnar文件中
Merge_On_Read表,增量写入log文件中
1.6.2 重新同步到hive metastore
由于本次写入的数据并没有新建分区,因此不需要重新同步。
1.7 Query
1.7.1 Run Hive Queries
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none
### Copy_On_Write
select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
### Merge_On_Read
# Read Optimized Query
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
# Snapshot Query
select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
注:具体的查询结果不在上面展示,可以查看官网Docker Demo章节http://hudi.apache.org/docs/docker_demo.html
主要演示的是Copy_On_Read和Merge_On_Read的底层写入机制。
1.7.2 Run Spark-SQL Queries
./bin/spark-sql \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar
### 切换数据库
show databases;
use hudi_stock;
### Copy_On_Write
select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
### Merge_On_Read
# Read Optimized Query
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
# Snapshot Query
select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
1.7.3 Run Presto Queries
重新编译prestosql.presto-302,编译测试问题见4.4记录。
将hudi-presto-bundle-0.5.2-incubating.jar拷贝至${PRESTO_HOME}/plugin/hive-hadoop2目录下,重启Presto。
1.8 增量查询Copy_On_Write
1. Hive Query
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20200530184035| GOOG | 2018-08-3109:59:00| 6330| 1230.5| 1230.02|
| 20200601094404| GOOG | 2018-08-3110:59:00| 9021| 1227.1993| 1227.215|
+----------------------+---------+----------------------+---------+------------+-----------+
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';
注:指定的时间戳是居于两次提交时间的中间。
2. Spark-SQL
./bin/spark-sql \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';
注:指定的时间戳是居于两次提交时间的中间。
3. Spark-shell
./bin/spark-shell \
--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--driver-class-path /opt/beh/core/hive/conf \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200530185035").load("/user/hive/warehouse/stock_ticks_cow")
hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);
1.9 在Merge_On_Read表上调度和运行compaction
1. 编译hudi,拷贝至管理节点
2. 进入hudi-cli模块
[hadoop@hadoop02 hudi]$ pwd
/home/hadoop/hudi
[hadoop@hadoop02 hudi]$ tree -L 2 hudi-cli
hudi-cli
├── conf
│ └── **hudi-env.sh**
├── hoodie-cmd.log
├── **hudi-cli.sh**
├── pom.xml
├── src
│ └── main
└── target
├── checkstyle-cachefile
├── checkstyle-checker.xml
├── checkstyle-result.xml
├── checkstyle-suppressions.xml
├── classes
├── classes.-502447588.timestamp
├── generated-sources
├── hudi-cli-0.5.2-incubating.jar
├── hudi-cli-0.5.2-incubating-sources.jar
├── **lib**
├── maven-archiver
├── maven-shared-archive-resources
├── maven-status
├── rat.txt
└── test-classes
3. 编辑hudi-cli文件
### echo `hadoop classpath`**
/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:/opt/beh/core/hadoop/contrib/capacity-scheduler/*.jar
### 编辑hudi-cli.sh文件,添加HADOOP依赖
diff --git a/tmp/hudi-cli.sh b/hudi-cli/hudi-cli.sh
index b6e708c..3086203100755
--- a/tmp/hudi-cli.sh
+++ b/hudi-cli/hudi-cli.sh
@@ -25,4+25,4@@ if[ -z "$CLIENT_JAR"]; then
echo "Client jar location not set, please set it in conf/hudi-env.sh"
fi
-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
4. 将写入kafka时定义的schema.avsc,上传至hdfs://
[hadoop@hadoop02 hudi]$ hdfs dfs -ls /var/demo/config/schema.avsc
-rw------- 3 hadoop hadoop 14642020-06-0114:58/var/demo/config/schema.avsc
5. 执行hudi-cli.sh
[hadoop@hadoop02 hudi-cli]$ ./hudi-cli.sh
hudi->connect --path /user/hive/warehouse/stock_ticks_mor
hudi:stock_ticks_mor->compactions show all
![](./1.png)
hudi:stock_ticks_mor->compaction schedule
Compaction successfully completed for***20200601144520\***
hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_mor
hudi:stock_ticks_mor->compactions show all
hudi:stock_ticks_mor->compaction run --compactionInstant 20200601144520--parallelism 2--sparkMemory 1G--schemaFilePath /var/demo/config/schema.avsc --retry1
Compaction successfully completed for20200601144520
hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_mor
hudi:stock_ticks_mor->compactions show all
6. 查看hudi数据
[hadoop@hadoop02 hudi]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31
Found4 items
-rw------- 3 hadoop hadoop 216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24
-rw------- 3 hadoop hadoop 932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata
-rw------- 3 hadoop hadoop 4434792020-06-0114:59/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-0-0_20200601144520.parquet
-rw------- 3 hadoop hadoop 4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet
1.10 Hive Queries ON Merge_On_Read
beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none
# Read Optimized Query
select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
# Snapshot Query
select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
# Incremental Query**
select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530284111';
注:增量查询指定的提交时间,是合并提交时间戳。官方是2018年,但是截至文档书写时间是2020.
1.11 Spark-SQL ON Merge_On_Read
./bin/spark-shell \
--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \
--driver-class-path /opt/beh/core/hive/conf \
--master yarn \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1
# Read Optimized Query
spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
# Snapshot Query
spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
# Incremental Query
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20200530284111'").show(100, false)
2. 管理
在测试过程中,存在两处管理操作
•一处是使用run_sync_tool.sh脚本同步到hive metastore•一处是Merge_On_Read表通过hudi-cli.sh执行compaction操作
此两处均依赖hudi打包后生成的jars。可能需要规划hudi的目录。
3. 自定义数据压测
注:自定义数据存在的问题
首先,全是新增数据,并不含有更新数据。测试使用的是默认操作upsert,还支持insert,bulk_insert。
其次,数据并非是随着时间逐步更新的,数据在第一批次,基本上所有的分区就已经落地。
3.1 数据准备—落入kafka
1. 自定义数据准备参考1.1.2节
2. 数据落kafka及配置参考1.2节
3.2 从Kafka写入HDFS
3.2.1 相关配置
1. base.properties
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false
2. schema.avsc
{
"type":"record",
"name":"store_sales",
"fields":[{
"name": "customer_id",
"type": "string"
}, {
"name": "shop_date",
"type": "string"
}, {
"name": "sum_cost",
"type": "double"
}
]}
3. kafka-source.properties
include=hdfs://beh001/hudi/store2/base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=customer_id
hoodie.datasource.write.partitionpath.field=shop_date
# schema provider configs
#schema.registry.url=http://localhost:8081
#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://beh001/hudi/store2/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://beh001/hudi/store2/schema.avsc
# Kafka Source
#hoodie.deltastreamer.source.kafka.topic=uber_trips
hoodie.deltastreamer.source.kafka.topic=store-2
#Kafka props
bootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092
auto.offset.reset=earliest
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";
注:配置文件上传至hdfs,因此配置文件中文件引用路径皆以hdfs://开头
3.2.2 数据持续写入COW表
nohup spark-submit \
--master yarn \
--num-executors 12 \
--executor-memory 18G \
--executor-cores 6 \
--deploy-mode cluster \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--continuous \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field shop_date \
--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_cow \
--target-table store_sales_cow \
--props hdfs://beh001/hudi/store_sales/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&
压测结果:
CommitTime | Total Bytes Written | Total Files Added | Total Files Updated | Total Partitions Written | Total Records Written | Total Update Records Written | Total Errors |
20200623114826 | 9.5 GB | 0 | 1823 | 1823 | 555059000 | 0 | 0 |
20200623114342 | 9.4 GB | 0 | 1823 | 1823 | 551850199 | 0 | 0 |
20200623113915 | 9.4 GB | 0 | 1823 | 1823 | 546850199 | 0 | 0 |
20200623113438 | 9.3 GB | 0 | 1823 | 1823 | 541850199 | 0 | 0 |
20200623113009 | 9.2 GB | 0 | 1823 | 1823 | 536850199 | 0 | 0 |
20200623112534 | 9.1 GB | 0 | 1823 | 1823 | 531850199 | 0 | 0 |
20200623112104 | 9.0 GB | 0 | 1823 | 1823 | 526850199 | 0 | 0 |
20200623111641 | 9.0 GB | 0 | 1823 | 1823 | 521850199 | 0 | 0 |
20200623111205 | 8.9 GB | 0 | 1823 | 1823 | 516850199 | 0 | 0 |
20200623110736 | 8.8 GB | 0 | 1823 | 1823 | 511850199 | 0 | 0 |
20200623110320 | 8.7 GB | 0 | 1823 | 1823 | 506850200 | 0 | 0 |
20200623105855 | 8.7 GB | 0 | 1823 | 1823 | 501850200 | 0 | 0 |
20200623105435 | 8.6 GB | 0 | 1823 | 1823 | 496850200 | 0 | 0 |
20200623105000 | 8.5 GB | 0 | 1823 | 1823 | 491850200 | 0 | 0 |
20200623104543 | 8.4 GB | 0 | 1823 | 1823 | 486850200 | 0 | 0 |
20200623104120 | 8.3 GB | 0 | 1823 | 1823 | 481850200 | 0 | 0 |
20200623103705 | 8.3 GB | 0 | 1823 | 1823 | 476850200 | 0 | 0 |
20200623103305 | 8.2 GB | 0 | 1823 | 1823 | 471850200 | 0 | 0 |
20200623102848 | 8.1 GB | 0 | 1823 | 1823 | 466850200 | 0 | 0 |
20200623102440 | 8.0 GB | 0 | 1823 | 1823 | 461850200 | 0 | 0 |
20200623102030 | 7.9 GB | 0 | 1823 | 1823 | 456850200 | 0 | 0 |
20200623101628 | 7.9 GB | 0 | 1823 | 1823 | 451850200 | 0 | 0 |
20200623101229 | 7.8 GB | 0 | 1823 | 1823 | 446850200 | 0 | 0 |
3.2.3 数据持续写入MOR表
nohup spark-submit \
--master yarn \
--num-executors 12 \
--executor-memory 18G \
--executor-cores 6 \
--deploy-mode cluster \
--keytab /opt/beh/metadata/key/presto.keytab \
--principal hadoop@BONC.COM \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type MERGE_ON_READ \
--continuous \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field shop_date \
--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_mor \
--target-table store_sales_mor \
--props hdfs://beh001/hudi/store_sales/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&
压测结果如下:
CommitTime | Total Bytes Written | Total Files Added | Total Files Updated | Total Partitions Written | Total Records Written | Total Update Records Written | Total Errors |
20200623165326 | 3.6 GB | 0 | 1823 | 1823 | 179999235 | 0 | 0 |
20200623165027 | 3.5 GB | 0 | 1823 | 1823 | 174999235 | 0 | 0 |
20200623164729 | 3.4 GB | 0 | 1823 | 1823 | 169999235 | 0 | 0 |
20200623164431 | 3.3 GB | 0 | 1823 | 1823 | 164999235 | 0 | 0 |
20200623164141 | 3.3 GB | 0 | 1823 | 1823 | 159999235 | 0 | 0 |
20200623163838 | 3.2 GB | 0 | 1823 | 1823 | 154999235 | 0 | 0 |
20200623163550 | 3.1 GB | 0 | 1823 | 1823 | 149999235 | 0 | 0 |
20200623163254 | 3.0 GB | 0 | 1823 | 1823 | 144999235 | 0 | 0 |
20200623163017 | 3.0 GB | 0 | 1823 | 1823 | 139999235 | 0 | 0 |
20200623162735 | 2.9 GB | 0 | 1823 | 1823 | 134999235 | 0 | 0 |
20200623162459 | 2.8 GB | 0 | 1823 | 1823 | 129999235 | 0 | 0 |
20200623162223 | 2.7 GB | 0 | 1823 | 1823 | 124999235 | 0 | 0 |
20200623161945 | 2.6 GB | 0 | 1823 | 1823 | 119999235 | 0 | 0 |
20200623161707 | 2.6 GB | 0 | 1823 | 1823 | 114999235 | 0 | 0 |
20200623161441 | 2.5 GB | 0 | 1823 | 1823 | 109999235 | 0 | 0 |
20200623161211 | 2.4 GB | 0 | 1823 | 1823 | 104999235 | 0 | 0 |
20200623160943 | 2.3 GB | 0 | 1823 | 1823 | 99999235 | 0 | 0 |
20200623160700 | 2.2 GB | 0 | 1823 | 1823 | 94999235 | 0 | 0 |
20200623160440 | 2.2 GB | 0 | 1823 | 1823 | 89999235 | 0 | 0 |
20200623160225 | 2.1 GB | 0 | 1823 | 1823 | 84999235 | 0 | 0 |
20200623160002 | 2.0 GB | 0 | 1823 | 1823 | 79999235 | 0 | 0 |
20200623155741 | 1.9 GB | 0 | 1823 | 1823 | 74999235 | 0 | 0 |
20200623155527 | 1.8 GB | 0 | 1823 | 1823 | 69999235 | 0 | 0 |
3.3 疑点
3.3.1 测试数据
自定义的数据全部是新增操作,不含有更新的情况。
3.3.2 写MOR时,没有delta log
在写MOR表时,自定义数据中没有更新数据的情况,并没有产生delta log。调整自定义数据增加部分更新操作,再次写MOR表时,确认是生成delta log。
4 问题记录
4.1 Kafkacat工具使用
[hadoop@bdev001 beh]$ cat batch_1.json | kafkacat -F kafkacat.conf -t zjh -P
% Reading configuration from file kafkacat.conf
%2|1591338623.769|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev001.bonc.com:9092/bootstrap]: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: No worthy mechs found
% ERROR: Local: Authentication failure: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)
%2|1591338624.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev003.bonc.com:9092/bootstrap]: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: No worthy mechs found
% ERROR: Local: Authentication failure: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)
%2|1591338625.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev002.bonc.com:9092/bootstrap]: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: No worthy mechs found
% ERROR: Local: Authentication failure: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)
% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating
解决方式:
yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi
4.2 Hive Queries
执行hive查询语句时,applicationmaster发现如下异常:
ERROR : Job failed with java.lang.ClassNotFoundException: org.apache.avro.LogicalType
java.util.concurrent.ExecutionException: Exception thrown by job
at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)
at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)
at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:382)
at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, bdev002.bonc.com, executor 1): java.lang.NoClassDefFoundError: org/apache/avro/LogicalType
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
at org.apache.spark.rdd.HadoopRDD$$anon$1.
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.NoClassDefFoundError: org/apache/avro/LogicalType
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)
at org.apache.spark.rdd.HadoopRDD$$anon$1.
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
解决方式:
经排查发现我们的hadoop包里的avro-1.7.4.jar文件确实不包含相应的类。而在hudi的编译工程里发现高版本avro-1.8.2.jar里有对应的类。于是进行替换/opt/beh/core/hadoop/share/hadoop/common/lib和/opt/beh/core/hive/lib/目录下的低版本avro包。然后尝试成功。
4.3 Presto Queries
1. HoodieException
org.apache.hudi.exception.HoodieException: Error reading Hoodie partition metadata for hdfs://beh001/user/hive/warehouse/stock_ticks_cow/2018/08/31
### Presto Coordinator节点开启debug
编辑${PRESTO_HOME}/etc/** **log.properties
io.prestosql=DEBUG
输出日志
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kafka, access=EXECUTE, inode="/user/hive/warehouse":hadoop:hadoop:drwx------
根本原因是kafka用户没有权限。
为什么是kafka用户?presto刷新了cache
4.4 编译测试
4.4.1 加载不到hadoop native包
java.lang.ExceptionInInitializerError
at io.prestosql.plugin.hive.HdfsEnvironment$$FastClassByGuice$$2c8553d4.newInstance(
at com.google.inject.internal.DefaultConstructionProxyFactory$FastClassProxy.newInstance(DefaultConstructionProxyFactory.java:89)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:114)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42)
at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65)
at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113)
at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)
at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)
at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:62)
at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)
at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)
at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)
at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211)
at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)
at com.google.inject.Guice.createInjector(Guice.java:87)
at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:240)
at io.prestosql.plugin.hive.HiveConnectorFactory.create(HiveConnectorFactory.java:123)
at io.prestosql.connector.ConnectorManager.createConnector(ConnectorManager.java:321)
at io.prestosql.connector.ConnectorManager.addCatalogConnector(ConnectorManager.java:195)
at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:187)
at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:173)
at io.prestosql.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:96)
at io.prestosql.metadata.StaticCatalogStore.loadCatalogs(StaticCatalogStore.java:74)
at io.prestosql.server.PrestoServer.run(PrestoServer.java:122)
at io.prestosql.server.PrestoServer.main(PrestoServer.java:68)
Caused by: java.lang.RuntimeException: failed to load Hadoop native library
at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:58)
at io.prestosql.plugin.hive.HdfsEnvironment.
... 31 more
Caused by: java.lang.RuntimeException: native snappy library not available: SnappyCompressor has not been loaded.
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:72)
at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
at io.prestosql.hadoop.HadoopNative.loadAllCodecs(HadoopNative.java:71)
at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:52)
... 32 more
解决方式:
export LD_LIBRARY_PATH=/opt/beh/core/hadoop/lib/native
或者在编译时处理。
4.4.2 加载不到org.apache.hadoop.yarn.conf.HAUtil
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/HAUtil
at org.apache.hadoop.mapred.Master.getMasterAddress(Master.java:61)
at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:88)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:213)
at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.HAUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
解决方式:将hadoop-yarn-api拷贝到${PRESTO_HOME}/plugin/hive-hadoop2