Hudi 指南 | Apache Hudi 测试运维权威指南

HBase技术社区

共 54138字,需浏览 109分钟

 · 2021-07-08

说明:本文档是基于Kerberized BEH-7.6及Spark-2.4.4


1. 生产测试数据

1.1 数据源

1.1.1 Hudi官方的测试数据

https://github.com/apache/hudi/tree/master/docker/demo/data

1.1.2 利用Hive的TPC-DS数据改造

利用TPC-DS的数据改造,具体查看GitLab:

https://code.bonc.com.cn/bdcut/hive-testbench/snippets/10

部分数据如下:

[hadoop@hadoop01 ~]$ head tpc-ds.json {"customer_id":"AAAAAAAAOKKNEKBA","shop_date":"1999/09/25","sum_cost":"140.33000069856644"}{"customer_id":"AAAAAAAAPNAJLMAA","shop_date":"2002/12/26","sum_cost":"29.320000052452087"}{"customer_id":"AAAAAAAAHNPHCLBA","shop_date":"1999/07/18","sum_cost":"45.949999272823334"}{"customer_id":"AAAAAAAAANHGIPAA","shop_date":"1998/10/04","sum_cost":"21.369999915361404"}{"customer_id":"AAAAAAAAPAHKHIBA","shop_date":"2001/12/13","sum_cost":"58.009999826550484"}{"customer_id":"AAAAAAAABNJBBABA","shop_date":"1998/11/06","sum_cost":"205.01999327540398"}{"customer_id":"AAAAAAAAILDDDKBA","shop_date":"1998/02/15","sum_cost":"107.06000108271837"}{"customer_id":"AAAAAAAAHFJMPEAA","shop_date":"1999/09/29","sum_cost":"43.04000025987625"}{"customer_id":"AAAAAAAAIBOOIHAA","shop_date":"2000/10/16","sum_cost":"122.53999684005976"}{"customer_id":"AAAAAAAAGEHOPKBA","shop_date":"2001/09/13","sum_cost":"73.4099994301796"}

1.2 将测试数据导入Kafka

1.2.1 利用kafkacat工具

1.2.1.1 接收

项目地址:https://github.com/edenhill/kafkacat

Kafkacat可以认为是kafka工具中的[netcat],是通用的非JVM生产消费工具 Apache Kafka >= 0.8。

1.2.1.2 非kerberized Kafka

1. 生产数据

cat batch_1.json | kafkacat -b 172.16.13.116:9092-t stock_ticks -P

2. 查看topic

kafkacat -L -b 172.16.13.116:9092-t stock_ticks

3. 查看topic元数据

kafkacat -b 172.16.13.116:9092-L -J | jq

1.2.1.3 Kerberized Kafka

1. 创建topic

kafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-ticks

2. 编辑kafkacat.conf文件,通过-F指定配置文件,通过kafkacat –X list查看可配置的参数

bootstrap.servers=bdev001.bonc.com:9092,bdev002.bonc.com:9092##可选security.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.kerberos.principal=kafka@BONC.COMsasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab

3. 生产数据

cat batch.txt | kafkacat \-F kafkacat.conf \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

或者

cat batch.txt | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

4. 查看topic

kafkacat -L \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-t stock-ticks

5. 查看topic元数据

kafkacat \-b hadoop02.bonc.com:9092 \-F kafkacat.conf \-L -J | jq

1.2.2 利用kafka自带的测试工具

Kafka自带的生产性能测试工具kafka-producer-perf-test.sh可以用来加载数据。

### 创建topickafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic stock-tickskafka-topics.sh --zookeeper hadoop02.bonc.com:2188/kafka --create --partitions 1--replication-factor 2--topic store-sales### 生产Hudi自带的数据kafka-producer-perf-test.sh --topic stock-ticks --throughput 1000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file batch_1.json --num-records 3482### 生产改造的数据kafka-producer-perf-test.sh --topic store-sales --throughput 100000--producer.config /opt/beh/core/kafka/config/producer.properties --print-metrics --payload-file store_sales.json --num-records 479456

注:

--payload-delimiter —— 指定分隔符,默认是新行\n

--throughput —— Required, 将吞吐量(msg/s)调整到大约等于该值,如果该值很大,将达不到

--num-records —— *指定生产的数据量(msgs***),这个自然要小于等于文件中的records数。

1.3 写入HDFS

1.3.1 stock_ticks

1. 编辑kafka-source.properties

# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=keyhoodie.datasource.write.partitionpath.field=date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=stock-ticks#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. base.properties

#### Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.###hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"stock_ticks","fields":[{"name": "volume","type": "long"}, {"name": "ts", "type": "string"}, {"name": "symbol", "type": "string"},{"name": "year", "type": "int"},{"name": "month", "type": "string"},{"name": "high", "type": "double"},{"name": "low", "type": "double"},{"name": "key", "type": "string"},{"name": "date", "type":"string"}, {"name": "close", "type": "double"}, {"name": "open", "type": "double"}, {"name": "day", "type":"string"}]}

4. 写出HDFS-Copy On Write类型

### local 模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

5. 写出HDFS——Merge On Read类型

### Local模式spark-submit \--master local[2] \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction### Spark On Yarn模式spark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/spark/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

1.3.2 store_sales

1. 编辑kafka-source.properties

include=base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field= customer_idhoodie.datasource.write.partitionpath.field= shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=/opt/beh/core/spark/hudi/config/stock_ticks/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=file:///opt/beh/core/hudi/config/stock_ticks/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic= store-sales-1#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

2. Base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

3. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

1.4 将hudi数据同步到Hive metastore

1. 前提搭建好hive环境

2. 从build的hoodie的hudi-hive模块获取run_sync_tool.sh文件

${Hoodie-Build}/hudi-hive/run_sync_tool.sh

3. 调整run_sync_tool.sh

[hadoop@hadoop02 tools]$ git diff run_sync_tool.sh.template run_sync_tool.shdiff --git a/run_sync_tool.sh.template b/run_sync_tool.shindex 42d2b9a..66c8180100755--- a/run_sync_tool.sh.template+++ b/run_sync_tool.sh@@ -47,9+47,11@@ if[ -z "${HIVE_JDBC}"]; then  HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`fi HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`-HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+# HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON+HIVE_JARS=`ls ${HIVE_HOME}/lib/*.jar | tr '\n' ':'`-HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/+# HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/li+HADOOP_HIVE_JARS=${HIVE_JARS}:${HIVE_HOME}/conf:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* echo "Running Command : java -cp ${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:$HUDI_HIVE_UBER_JAR org.apache.hudi.hive.HiveSyncTool $@" java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR} org.apache.hudi.hive.HiveSyncTool"$@" 

4. 将hudi-utilities-*.jar放在${HIVE_HOME}/lib目录下,并重启Hive服务

5. 初始化票据,并创建hudi_stock数据库

$ kinit hadoop$ beeline –u “jdbc:hive2://hadoop01.bonc.com:10000/;principal=hs2/hadoop01.bonc.com@BONC.COM”

6. 同步1.3节写入HDFS的stock_ticks_cow

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--database hudi_stock \--table stock_ticks_cow

注:

1、执行需要kinit缓存票据

2、即使使用缓存—user –pass也必须指定,脚本定义

7. 同步1.3节写入HDFS的stock_ticks_mor

./run_sync_tool.sh \--jdbc-url "jdbc:hive2://hadoop02.bonc.com:10000/;principal=hs2/hadoop02.bonc.com@BONC.COM" \--user hadoop \--pass hadoop \--partitioned-by dt \--base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--database hudi_stock \--table stock_ticks_mor

8. 查看同步结果

0: jdbc:hive2://hadoop02.bonc.com:10000/> show tables from hudi_stock;+---------------------+|   tab_name    |+---------------------+| stock_ticks_cow   || stock_ticks_mor_ro || stock_ticks_mor_rt |+---------------------+

说明:

表stock_ticks_cow由步骤6产生,支持基于HoodieParquetInputFormat的snapshot query和incremental query;

表stock_ticks_mor_rt和stock_ticks_mor_ro由步骤7产生;

其中表stock_ticks_mor_rt支持基于HoodieParquetRealtimeInputFormat的snapshot query和incremental query;表stock_ticks_mor_ro支持基于HoodieParquetInputFormat的read optimized query。

1.5 Query

1.5.1 Run Hive Queries

前提条件:需要处理的问题如下

1. 调整hive-site.xml的配置,并重启hive服务

### 修改配置,默认值是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat<property><name>hive.input.format</name><value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value></property>

2. 指定参数set hive.fetch.task.conversion=none;

3. 将hudi-utilities-*.jar拷贝到${HIVE_HOME}/lib目录下

4. 另外将hudi-hadoop-mr-*.jar分发到${HADOOP_HOME}/share/hadoop/common/及${HIVE_HOME}/lib/目录下。

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneshow tables;show partitions stock_ticks_mor_rt;### Copy_On_Write Queriesselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG'; select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read Queries# Read Optimized Query,查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';# Snapshot Query 查询最新的时间戳select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';#select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:实际上以上查询主要是针对各种类型(三种)的表执行两种查询

第一种:查看最新的时间戳

第二种:查询当前数据的部分投影,及部分数据

1.5.2 Run Spark-SQL Queries

运行Spark-SQL查询时,将hudi-spark-*.jar包拷贝至${SPARK_HOME}/jars目录下,同时间接依赖hive query中介绍的hudi-hadoop-mr-*.jar(放置到集群中hadoop/hive安装环境下)。

hudi-hadoop-mr-*.jar和RecordReader相关

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarSqlshow databases;use hudi_stock;select symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1Sqlspark.sql("show databases").show(100, false)spark.sql("show tables from hudi_stock").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_cow where symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false):quit

1.5.3 Run Presto Queries(降低Presto版本成功)

1. 将hudi-presto-*.jar分发到${PRESTO_HOME}/plugin/hive-hadoop2/目录下,并重启Presto。

2. 调整${PRESTO_HOME}/etc/catalog/hive.properties,添加yarn-site的配置文件

hive.config.resources=/opt/beh/core/hadoop/etc/hadoop/core-site.xml,/opt/beh/core/hadoop/etc/hadoop/hdfs-site.xml,/opt/beh/core/hadoop/etc/hadoop/yarn-site.xml
./bin/presto \--server https://hadoop01.bonc.com:7778 \--krb5-config-path /etc/krb5.conf \--krb5-principal hadoop@BONC.COM \--krb5-keytab-path /opt/beh/metadata/key/presto.keytab \--krb5-remote-service-name presto \--keystore-path /opt/beh/metadata/key/hadoop.keystore \--keystore-password hadoop \--catalog hive \--schema hudi_stock### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read_ROselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select"_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';

注:Presto不支持增量查询,只支持HoodieParquetInputFormat格式的查询即Copy_On_Write上的snapshot query以及Merge_On_Read_RO上的read optimized query。

异常终止:版本不一致导致

2020-05-28T18:00:07.263+0800 WARN hive-hive-0 io.prestosql.plugin.hive.util.ResumableTasks ResumableTask completed exceptionally

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/client/util/YarnClientUtils

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:58)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:216)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.client.util.YarnClientUtils

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 17 more

1.6 写入第二批数据

1.6.1 Hoodie自带的数据

1. 利用kafkacat导入Kafka

cat /opt/beh/core/hudi/data/batch_2.json | kafkacat \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanism=GSSAPI \-X sasl.kerberos.service.name=kafka \-X sasl.kerberos.principal=kafka@BONC.COM \-X sasl.kerberos.keytab=/opt/beh/metadata/key/hadoop.keytab \-b hadoop02.bonc.com:9092 \-t stock-ticks \-P

2. 写入HDFS

### Copy_On_Writespark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_cow \--target-table stock_ticks_cow \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider### Merge_On_Readspark-submit \--master yarn \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/spark/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field ts \--target-base-path hdfs://beh001/user/hive/warehouse/stock_ticks_mor \--target-table stock_ticks_mor \--props file:///opt/beh/core/hudi/config/stock_ticks/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--disable-compaction

3. 查看导入结果

### Copy_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31Found3 items-rw-------  3 hadoop hadoop     932020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437892020-05-3018:40/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-21_20200530184035.parquet-rw-------  3 hadoop hadoop   4435182020-06-0109:44/user/hive/warehouse/stock_ticks_cow/2018/08/31/da906d57-1987-4439-8f18-14f14e380705-0_0-21-24_20200601094404.parquet### Merge_On_Read[hadoop@hadoop02 tools]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found3 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

注:可以看到Copy_On_Read表,直接写出到Columnar文件中

Merge_On_Read表,增量写入log文件中

1.6.2 重新同步到hive metastore

由于本次写入的数据并没有新建分区,因此不需要重新同步。

1.7 Query

1.7.1 Run Hive Queries

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

注:具体的查询结果不在上面展示,可以查看官网Docker Demo章节http://hudi.apache.org/docs/docker_demo.html

主要演示的是Copy_On_Read和Merge_On_Read的底层写入机制。

1.7.2 Run Spark-SQL Queries

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar ### 切换数据库show databases;use hudi_stock;### Copy_On_Writeselect symbol, max(ts) from stock_ticks_cow groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';### Merge_On_Read # Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

1.7.3 Run Presto Queries

重新编译prestosql.presto-302,编译测试问题见4.4记录。

将hudi-presto-bundle-0.5.2-incubating.jar拷贝至${PRESTO_HOME}/plugin/hive-hadoop2目录下,重启Presto。

1.8 增量查询Copy_On_Write

1. Hive Query

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=noneselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';+----------------------+---------+----------------------+---------+------------+-----------+| _hoodie_commit_time | symbol |     ts     | volume |  open  |  close  |+----------------------+---------+----------------------+---------+------------+-----------+| 20200530184035| GOOG  | 2018-08-3109:59:00| 6330| 1230.5| 1230.02|| 20200601094404| GOOG  | 2018-08-3110:59:00| 9021| 1227.1993| 1227.215|+----------------------+---------+----------------------+---------+------------+-----------+select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

2. Spark-SQL

./bin/spark-sql \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jarselect`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530185035';

注:指定的时间戳是居于两次提交时间的中间。

3. Spark-shell

./bin/spark-shell \--jars /opt/beh/core/spark/jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1import org.apache.hudi.DataSourceReadOptionsval hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200530185035").load("/user/hive/warehouse/stock_ticks_cow")hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);

1.9 在Merge_On_Read表上调度和运行compaction

1. 编译hudi,拷贝至管理节点

2. 进入hudi-cli模块

[hadoop@hadoop02 hudi]$ pwd/home/hadoop/hudi[hadoop@hadoop02 hudi]$ tree -L 2 hudi-clihudi-cli├── conf│  └── **hudi-env.sh**├── hoodie-cmd.log├── **hudi-cli.sh**├── pom.xml├── src│  └── main└── target├── checkstyle-cachefile├── checkstyle-checker.xml├── checkstyle-result.xml├── checkstyle-suppressions.xml├── classes├── classes.-502447588.timestamp├── generated-sources├── hudi-cli-0.5.2-incubating.jar├── hudi-cli-0.5.2-incubating-sources.jar├── **lib**├── maven-archiver├── maven-shared-archive-resources├── maven-status├── rat.txt└── test-classes

3. 编辑hudi-cli文件

### echo `hadoop classpath`**/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:/opt/beh/core/hadoop/contrib/capacity-scheduler/*.jar### 编辑hudi-cli.sh文件,添加HADOOP依赖diff --git a/tmp/hudi-cli.sh b/hudi-cli/hudi-cli.shindex b6e708c..3086203100755--- a/tmp/hudi-cli.sh+++ b/hudi-cli/hudi-cli.sh@@ -25,4+25,4@@ if[ -z "$CLIENT_JAR"]; then  echo "Client jar location not set, please set it in conf/hudi-env.sh"fi-java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@+java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:/opt/beh/core/hadoop/etc/hadoop:/opt/beh/core/hadoop/share/hadoop/common/lib/*:/opt/beh/core/hadoop/share/hadoop/common/*:/opt/beh/core/hadoop/share/hadoop/hdfs:/opt/beh/core/hadoop/share/hadoop/hdfs/lib/*:/opt/beh/core/hadoop/share/hadoop/hdfs/*:/opt/beh/core/hadoop/share/hadoop/yarn/lib/*:/opt/beh/core/hadoop/share/hadoop/yarn/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/lib/*:/opt/beh/core/hadoop/share/hadoop/mapreduce/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@

4. 将写入kafka时定义的schema.avsc,上传至hdfs:////var/demo/config

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /var/demo/config/schema.avsc-rw-------  3 hadoop hadoop    14642020-06-0114:58/var/demo/config/schema.avsc

5. 执行hudi-cli.sh

[hadoop@hadoop02 hudi-cli]$ ./hudi-cli.shhudi->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all![](./1.png)hudi:stock_ticks_mor->compaction scheduleCompaction successfully completed for***20200601144520\***hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show allhudi:stock_ticks_mor->compaction run --compactionInstant 20200601144520--parallelism 2--sparkMemory 1G--schemaFilePath /var/demo/config/schema.avsc --retry1Compaction successfully completed for20200601144520hudi:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_morhudi:stock_ticks_mor->compactions show all

6. 查看hudi数据

[hadoop@hadoop02 hudi]$ hdfs dfs -ls /user/hive/warehouse/stock_ticks_mor/2018/08/31Found4 items-rw-------  3 hadoop hadoop   216232020-06-0109:43/user/hive/warehouse/stock_ticks_mor/2018/08/31/.d972178a-2139-48cb-adea-909a1735266d-0_20200530184111.log.1_0-21-24-rw-------  3 hadoop hadoop     932020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/.hoodie_partition_metadata-rw-------  3 hadoop hadoop   4434792020-06-0114:59/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-0-0_20200601144520.parquet-rw-------  3 hadoop hadoop   4437772020-05-3018:41/user/hive/warehouse/stock_ticks_mor/2018/08/31/d972178a-2139-48cb-adea-909a1735266d-0_0-21-21_20200530184111.parquet

1.10 Hive Queries ON Merge_On_Read

beeline -u "jdbc:hive2://hadoop02.bonc.com:10000/hudi_stock;principal=hs2/hadoop02.bonc.com@BONC.COM"--hiveconf hive.fetch.task.conversion=none# Read Optimized Queryselect symbol, max(ts) from stock_ticks_mor_ro groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';# Snapshot Queryselect symbol, max(ts) from stock_ticks_mor_rt groupby symbol HAVING symbol = 'GOOG';select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';# Incremental Query**select`_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'and`_hoodie_commit_time`> '20200530284111';

注:增量查询指定的提交时间,是合并提交时间戳。官方是2018年,但是截至文档书写时间是2020.

1.11 Spark-SQL ON Merge_On_Read

./bin/spark-shell \--jars /opt/beh/core/spark /jars/hudi-spark-bundle_2.11-0.5.2-incubating.jar \--driver-class-path /opt/beh/core/hive/conf \--master yarn \--conf spark.sql.hive.convertMetastoreParquet=false \--deploy-mode client \--driver-memory 1G \--executor-memory 3G \--num-executors 1# Read Optimized Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)# Snapshot Queryspark.sql("select symbol, max(ts) from hudi_stock.stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)# Incremental Queryspark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from hudi_stock.stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20200530284111'").show(100, false)

2. 管理

在测试过程中,存在两处管理操作

一处是使用run_sync_tool.sh脚本同步到hive metastore一处是Merge_On_Read表通过hudi-cli.sh执行compaction操作

此两处均依赖hudi打包后生成的jars。可能需要规划hudi的目录。

3. 自定义数据压测

注:自定义数据存在的问题

首先,全是新增数据,并不含有更新数据。测试使用的是默认操作upsert,还支持insert,bulk_insert。

其次,数据并非是随着时间逐步更新的,数据在第一批次,基本上所有的分区就已经落地。

3.1 数据准备—落入kafka

1. 自定义数据准备参考1.1.2节

2. 数据落kafka及配置参考1.2节

3.2 从Kafka写入HDFS

3.2.1 相关配置

1. base.properties

hoodie.upsert.shuffle.parallelism=2hoodie.insert.shuffle.parallelism=2hoodie.bulkinsert.shuffle.parallelism=2hoodie.embed.timeline.server=truehoodie.filesystem.view.type=EMBEDDED_KV_STOREhoodie.compact.inline=false

2. schema.avsc

{"type":"record","name":"store_sales","fields":[{"name": "customer_id","type": "string"}, {"name": "shop_date", "type": "string"}, {"name": "sum_cost", "type": "double"}]}

3. kafka-source.properties

include=hdfs://beh001/hudi/store2/base.properties# Key fields, for kafka examplehoodie.datasource.write.recordkey.field=customer_idhoodie.datasource.write.partitionpath.field=shop_date# schema provider configs#schema.registry.url=http://localhost:8081#hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latesthoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://beh001/hudi/store2/schema.avschoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://beh001/hudi/store2/schema.avsc# Kafka Source#hoodie.deltastreamer.source.kafka.topic=uber_tripshoodie.deltastreamer.source.kafka.topic=store-2#Kafka propsbootstrap.servers=hadoop02.bonc.com:9092,hadoop03.bonc.com:9092,hadoop04.bonc.com:9092auto.offset.reset=earliestsecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIsasl.kerberos.service.name=kafkasasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/opt/beh/metadata/key/hadoop.keytab" principal="kafka@BONC.COM";

注:配置文件上传至hdfs,因此配置文件中文件引用路径皆以hdfs://开头

3.2.2 数据持续写入COW表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type COPY_ON_WRITE \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_cow \--target-table store_sales_cow \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果:

CommitTimeTotal Bytes WrittenTotal Files AddedTotal Files UpdatedTotal Partitions WrittenTotal Records WrittenTotal Update Records WrittenTotal Errors
202006231148269.5 GB01823182355505900000
202006231143429.4 GB01823182355185019900
202006231139159.4 GB01823182354685019900
202006231134389.3 GB01823182354185019900
202006231130099.2 GB01823182353685019900
202006231125349.1 GB01823182353185019900
202006231121049.0 GB01823182352685019900
202006231116419.0 GB01823182352185019900
202006231112058.9 GB01823182351685019900
202006231107368.8 GB01823182351185019900
202006231103208.7 GB01823182350685020000
202006231058558.7 GB01823182350185020000
202006231054358.6 GB01823182349685020000
202006231050008.5 GB01823182349185020000
202006231045438.4 GB01823182348685020000
202006231041208.3 GB01823182348185020000
202006231037058.3 GB01823182347685020000
202006231033058.2 GB01823182347185020000
202006231028488.1 GB01823182346685020000
202006231024408.0 GB01823182346185020000
202006231020307.9 GB01823182345685020000
202006231016287.9 GB01823182345185020000
202006231012297.8 GB01823182344685020000

3.2.3 数据持续写入MOR表

nohup spark-submit \--master yarn \--num-executors 12 \--executor-memory 18G \--executor-cores 6 \--deploy-mode cluster \--keytab /opt/beh/metadata/key/presto.keytab \--principal hadoop@BONC.COM \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer/opt/beh/core/hudi/jars/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \--table-type MERGE_ON_READ \--continuous \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--source-ordering-field shop_date \--target-base-path hdfs://beh001/user/hive/warehouse/store_sales_mor \--target-table store_sales_mor \--props hdfs://beh001/hudi/store_sales/kafka-source.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider&

压测结果如下:

CommitTimeTotal Bytes WrittenTotal Files AddedTotal Files UpdatedTotal Partitions WrittenTotal Records WrittenTotal Update Records WrittenTotal Errors
202006231653263.6 GB01823182317999923500
202006231650273.5 GB01823182317499923500
202006231647293.4 GB01823182316999923500
202006231644313.3 GB01823182316499923500
202006231641413.3 GB01823182315999923500
202006231638383.2 GB01823182315499923500
202006231635503.1 GB01823182314999923500
202006231632543.0 GB01823182314499923500
202006231630173.0 GB01823182313999923500
202006231627352.9 GB01823182313499923500
202006231624592.8 GB01823182312999923500
202006231622232.7 GB01823182312499923500
202006231619452.6 GB01823182311999923500
202006231617072.6 GB01823182311499923500
202006231614412.5 GB01823182310999923500
202006231612112.4 GB01823182310499923500
202006231609432.3 GB0182318239999923500
202006231607002.2 GB0182318239499923500
202006231604402.2 GB0182318238999923500
202006231602252.1 GB0182318238499923500
202006231600022.0 GB0182318237999923500
202006231557411.9 GB0182318237499923500
202006231555271.8 GB0182318236999923500

3.3 疑点

3.3.1 测试数据

自定义的数据全部是新增操作,不含有更新的情况。

3.3.2 写MOR时,没有delta log

在写MOR表时,自定义数据中没有更新数据的情况,并没有产生delta log。调整自定义数据增加部分更新操作,再次写MOR表时,确认是生成delta log。

4 问题记录

4.1 Kafkacat工具使用

[hadoop@bdev001 beh]$ cat batch_1.json | kafkacat -F kafkacat.conf -t zjh -P             % Reading configuration from file kafkacat.conf%2|1591338623.769|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev001.bonc.com:9092/bootstrap]: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev001.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338624.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev003.bonc.com:9092/bootstrap]: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev003.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)%2|1591338625.768|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://bdev002.bonc.com:9092/bootstrap]: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: No worthy mechs found% ERROR: Local: Authentication failure: sasl_plaintext://bdev002.bonc.com:9092/bootstrap: Failed to initialize SASL authentication: SASL handshake failed (start (-4)): SASL(-4): **no mechanism available: No worthy mechs found** (after 0ms in state AUTH)% ERROR: Local: All broker connections are down: 3/3 brokers are down : terminating

解决方式:

yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi

4.2 Hive Queries

执行hive查询语句时,applicationmaster发现如下异常:

ERROR : Job failed with java.lang.ClassNotFoundException: org.apache.avro.LogicalType

java.util.concurrent.ExecutionException: Exception thrown by job

at org.apache.spark.JavaFutureActionWrapper.getImpl(FutureAction.scala:337)

at org.apache.spark.JavaFutureActionWrapper.get(FutureAction.scala:342)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:382)

at org.apache.hive.spark.client.RemoteDriver$JobWrapper.call(RemoteDriver.java:343)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 4, bdev002.bonc.com, executor 1): java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Caused by: java.lang.NoClassDefFoundError: org/apache/avro/LogicalType

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:335)

at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.(AbstractRealtimeRecordReader.java:108)

at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.(RealtimeCompactedRecordReader.java:50)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)

at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.(HoodieRealtimeRecordReader.java:47)

at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:251)

at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:418)

at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:257)

at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:256)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:214)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.avro.LogicalType

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 23 more

解决方式:

经排查发现我们的hadoop包里的avro-1.7.4.jar文件确实不包含相应的类。而在hudi的编译工程里发现高版本avro-1.8.2.jar里有对应的类。于是进行替换/opt/beh/core/hadoop/share/hadoop/common/lib和/opt/beh/core/hive/lib/目录下的低版本avro包。然后尝试成功。

4.3 Presto Queries

1. HoodieException

org.apache.hudi.exception.HoodieException: Error reading Hoodie partition metadata for hdfs://beh001/user/hive/warehouse/stock_ticks_cow/2018/08/31### Presto Coordinator节点开启debug编辑${PRESTO_HOME}/etc/** **log.propertiesio.prestosql=DEBUG输出日志org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=kafka, access=EXECUTE, inode="/user/hive/warehouse":hadoop:hadoop:drwx------

根本原因是kafka用户没有权限。

为什么是kafka用户?presto刷新了cache

4.4 编译测试

4.4.1 加载不到hadoop native包

java.lang.ExceptionInInitializerError

at io.prestosql.plugin.hive.HdfsEnvironment$$FastClassByGuice$$2c8553d4.newInstance()

at com.google.inject.internal.DefaultConstructionProxyFactory$FastClassProxy.newInstance(DefaultConstructionProxyFactory.java:89)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:114)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.SingleParameterInjector.inject(SingleParameterInjector.java:42)

at com.google.inject.internal.SingleParameterInjector.getAll(SingleParameterInjector.java:65)

at com.google.inject.internal.ConstructorInjector.provision(ConstructorInjector.java:113)

at com.google.inject.internal.ConstructorInjector.construct(ConstructorInjector.java:91)

at com.google.inject.internal.ConstructorBindingImpl$Factory.get(ConstructorBindingImpl.java:306)

at com.google.inject.internal.FactoryProxy.get(FactoryProxy.java:62)

at com.google.inject.internal.ProviderToInternalFactoryAdapter.get(ProviderToInternalFactoryAdapter.java:40)

at com.google.inject.internal.SingletonScope$1.get(SingletonScope.java:168)

at com.google.inject.internal.InternalFactoryToProviderAdapter.get(InternalFactoryToProviderAdapter.java:39)

at com.google.inject.internal.InternalInjectorCreator.loadEagerSingletons(InternalInjectorCreator.java:211)

at com.google.inject.internal.InternalInjectorCreator.injectDynamically(InternalInjectorCreator.java:182)

at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:109)

at com.google.inject.Guice.createInjector(Guice.java:87)

at io.airlift.bootstrap.Bootstrap.initialize(Bootstrap.java:240)

at io.prestosql.plugin.hive.HiveConnectorFactory.create(HiveConnectorFactory.java:123)

at io.prestosql.connector.ConnectorManager.createConnector(ConnectorManager.java:321)

at io.prestosql.connector.ConnectorManager.addCatalogConnector(ConnectorManager.java:195)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:187)

at io.prestosql.connector.ConnectorManager.createConnection(ConnectorManager.java:173)

at io.prestosql.metadata.StaticCatalogStore.loadCatalog(StaticCatalogStore.java:96)

at io.prestosql.metadata.StaticCatalogStore.loadCatalogs(StaticCatalogStore.java:74)

at io.prestosql.server.PrestoServer.run(PrestoServer.java:122)

at io.prestosql.server.PrestoServer.main(PrestoServer.java:68)

Caused by: java.lang.RuntimeException: failed to load Hadoop native library

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:58)

at io.prestosql.plugin.hive.HdfsEnvironment.(HdfsEnvironment.java:37)

... 31 more

Caused by: java.lang.RuntimeException: native snappy library not available: SnappyCompressor has not been loaded.

at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:72)

at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at io.prestosql.hadoop.HadoopNative.loadAllCodecs(HadoopNative.java:71)

at io.prestosql.hadoop.HadoopNative.requireHadoopNative(HadoopNative.java:52)

... 32 more

解决方式:

export LD_LIBRARY_PATH=/opt/beh/core/hadoop/lib/native

或者在编译时处理。

4.4.2 加载不到org.apache.hadoop.yarn.conf.HAUtil

java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/conf/HAUtil

at org.apache.hadoop.mapred.Master.getMasterAddress(Master.java:61)

at org.apache.hadoop.mapred.Master.getMasterPrincipal(Master.java:88)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)

at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:81)

at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:213)

at org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:105)

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:362)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)

at io.prestosql.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)

at io.prestosql.plugin.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)

at io.prestosql.plugin.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)

at io.prestosql.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)

at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.conf.HAUtil

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at io.prestosql.server.PluginClassLoader.loadClass(PluginClassLoader.java:80)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

... 18 more

解决方式:将hadoop-yarn-api拷贝到${PRESTO_HOME}/plugin/hive-hadoop2


浏览 83
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报