Flink 实践 | Flink CDC + Hudi + Hive + Presto 构建实时数据湖最佳实践

共 10394字,需浏览 21分钟

 ·

2021-11-24 02:19



1. 测试过程环境版本说明

Flink1.13.1

Scala2.11

CDH6.2.0

Hadoop3.0.0

Hive2.1.1

Hudi0.10(master)

PrestoDB0.256

Mysql5.7

2. 集群服务器基础环境

2.1 Maven和JDK环境版本

2.2 Hadoop 集群环境版本

2.3 HADOOP环境变量配置

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`  

3. Hudi编译环境配置

3.1 Maven Home settings.xml配置修改

说明:指定aliyun maven地址(支持CDH cloudera依赖) mirror库

alimavencentral,!clouderaaliyun mavenhttp://maven.aliyun.com/nexus/content/groups/public/

3.2 下载Hudi源码包

git clone https://github.com/apache/hudi.git

Hudi社区建议版本适配

hudi0.9 适配 flink1.12.2

hudi0.10(master) 适配 flink1.13.X ( 说明master分支上版本还未release)

3.3 Hudi 客户端命令行

3.4 修改Hudi集成flink和Hive编译依赖版本配置

hudi-master/packaging/hudi-flink-bundle

pom.xml文件 ( 笔者环境CDH6.2.0 hive2.1.1)

flink-bundle-shade-hive22.1.1-cdh6.2.0compile${hive.groupid}hive-service-rpc${hive.version}${flink.bundle.hive.scope}

3.5 编译Hudi 指定Hadoop和Hive版本信息

mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.0.0-Pflink-bundle-shade-hive2

(可加 –e –X 参数查看编译ERROR异常和DEBUG信息)

说明:默认scala2.11、默认不包含hive依赖

首次编译耗时较长 笔者首次编译大概花费 50min+(也和服务器网络有关)

后续编译会快一些 大约15min左右

3.6 Hudi编译异常

修改Hudi master pom.xml 增加 CDH repository地址

3.7 Hudi重新编译

3.8 Hudi编译结果说明

hudi-master/packaging/hudi-flink-bundle/target

hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:hudi-flink-bundle jar 是 flink 用来写入和读取数据

hudi-master/packaging/hudi-hadoop-mr-bundle/target

hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

说明:hudi-mr-bundle jar 是 hive 需要用来读hudi数据

4. Flink环境配置

版本说明:Flink 1.13.1 scala2.11版本

4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

4.2 flink-conf.yaml配置修改

# state.backend: filesystemstate.backend: rocksdb# 开启增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first

4.3 FLINK_HOME lib下添加依赖

flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依赖 flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy过来hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi编译jar copy过来hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar

说明:目前oracle cdc jar和mysql cdc jar一起在lib下发现有冲突异常

5 启动flink yarn session服务

5.1 FLINK_HOME shell 命令

$FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

5.2 Yarn Web UI

5.3 Flinksql Client 启动命令

$FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell

说明:-j指定hudi-flink 依赖jar

Show table /show catalogs

6. MySQL binlog 开启配置

6.1 创建binlog日志存储路径

mkdir logs

6.2 修改目录属主和group

chown -R mysql:mysql /mysqldata/logs

6.3 修改mysql配置信息

vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full

6.4 修改完,重启mysql server

service mysqld restart

6.5 客户端查看binlog日志情况

show master logs;

Mysql 版本:5.7.30

5.6 创建mysql sources 表 DDL

create table users_cdc(   id bigint auto_increment primary key,   name varchar(20) null,   birthday timestamp default CURRENT_TIMESTAMP notnull,   ts timestamp default CURRENT_TIMESTAMP notnull);

7. FlinkCDC sink Hudi测试代码过程

7.1 Flink sql cdc DDL 语句:(具体参数说明可参考flink官网)

CREATE TABLE mysql_users (    id BIGINT PRIMARY KEY NOT ENFORCED ,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

7.2 查询mysql cdc 表

Flink SQL> select * from mysql_users;

由于目前MySQL users_cdc表是空,所以flinksql 查询没有数据 只有表结构;

Flink web UI:

7.3 创建一个临时视图,增加分区列 方便后续同步hive分区表

Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as partition FROM mysql_users;

说明:partition 关键字需要 `` 引起来

查询视图数据也是空结构,但增加了分区字段:

Flink SQL> select * from mycdc_v;

Flink web UI:

7.4 设置checkpoint间隔时间,存储路径已在flink-conf配置设置全局路径

建议:测试环境 可设置秒级别(不能太小),生产环境可设置分钟级别。

Flink SQL> set execution.checkpointing.interval=30sec;

7.5 Flinksql 创建 cdc sink hudi文件,并自动同步hive分区表DDL 语句

CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必须指定uuid 主键)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ , 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型);

说明:Hudi目前支持MOR和COW两种模式

(1) Copy on Write:使用列式存储来存储数据(例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件

(2) Merge on Read:使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。

COW:Copy on Write (写时复制),快照查询+增量查询

MOR:Merge on Read (读时合并),快照查询+增量查询+读取优化查询(近实时)

使用场景上:

(1)COW适用写少读多的场景 ,MOR 适用写多读少的场景;

(2)MOR适合CDC场景,更新延迟要求较低,COW目前不支持 changelog mode 不适合处理cdc场景;

Flink web UI

7.6 Flink sql mysql cdc数据写入hudi文件数据

Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,partition from mycdc_v;

Flink web UI DAG图:

7.7 HDFS上Hudi文件目录情况

说明:目前还没写入测试数据,hudi目录只生成一些状态标记文件,还未生成分区目录以及.log 和.parquet数据文件,具体含义可见hudi官方文档。

7.8 Mysql数据源写入测试数据

insert into users_cdc (name) values ('cdc01');

7.9 Flinksql 查询mysql cdc insert数据:

Flink SQL> set execution.result-mode=tableau;

[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.

[INFO] Session property has been set.

Flink SQL> select * from mysql_users; -- 查询到一条insert数据

7.10 Flink web UI页面可以看到DAG 各个环节产生一条测试数据

7.11 Flinksql 查询 sink的hudi表数据

Flink SQL> select * from mysqlcdc_sync_hive01; --已查询到一条insert数据

7.12 Hdfs上Hudi文件目录变化情况

7.13 Hive分区表和数据自动同步情况

7.14 查看自动创建hive表结构

hive> show create table mysqlcdc_sync_hive01_ro;

hive> show create table mysqlcdc_sync_hive01_rt;

7.15 查看自动生成的表分区信息

hive> show partitions mysqlcdc_sync_hive01_ro;

hive> show partitions mysqlcdc_sync_hive01_rt;

说明:已自动生产hudi MOR模式的

mysqlcdc_sync_hive01_ro

mysqlcdc_sync_hive01_rt

ro表和rt表区别:

ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和普通的 Hive 表一样查询即可;

rt表示增量视图,主要针对增量查询的rt表;

ro表只能查parquet文件数据, rt表 parquet文件数据和log文件数据都可查;

7.16 Hive访问Hudi数据

说明:需要引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar

引入Hudi依赖jar方式:

(1) 引入到 $HIVE_HOME/lib下;

(2) 引入到$HIVE_HOME/auxlib 自定义第三方依赖 修改 hive-site.xml配置文件;

(3) Hive shell命令行引入 Session级别有效;

其中(1)和(3)配置完后需要重启 hive-server服务;

查询Hive 分区表数据:

hive> select * from mysqlcdc_sync_hive01_ro; --已查询到mysq insert的一条数据

hive> select * from mysqlcdc_sync_hive01_rt; --已查询到mysq insert的一条数据

Hive 条件查询:

hive> select name,ts from mysqlcdc_sync_hive01_ro where partition='20211109';

Hive ro表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_ro;


Hive Count异常解决:

引入hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar依赖

hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;

hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;


hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count


Hive rt表 count查询

hive> select count(1) from mysqlcdc_sync_hive01_rt;

说明:rt 表count 还是异常,和Hudi社区人员沟通hudi master目前还没release这块存在bug正在修复中

具体见:https://issues.apache.org/jira/browse/HUDI-2649

7.17 Mysql 数据源写入多条测试数据

insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');


Flink web UI DAG中数据链路情况:

7.18 Flinksql中新写入数据查询情况

Yarn web UI application_1626256835287_40351[1]资源使用情况


Hdfs上Hudi文件目录变化情况


Hudi状态文件说明:

(1)requested:表示一个动作已被安排,但尚未启动

(2)inflight:表示当前正在执行操作

(3)completed:表示在时间线上完成了操作

Flink jobmanager log sync hive过程详细日志




7.19 Mysql 数据源更新数据

update users_cdc set name = 'cdc05-bj'where id = 5;


7.20 Flinksql 查询cdc update数据 产生两条binlog数据


说明:flinksql 查询最终只有一条+I有效数据,且数据已更新

Flink web UI DAG接受到两条binlog数据,但最终compact和sink只有一条有效数据


7.21 MySQL 数据源 delete 一条数据:

deletefrom users_cdc where id = 3;


Flink Web UI job DAG中捕获一条新数据:


Flinksql changlog delete数据变化查询

HDFS上Hudi数据文件生成情况



Hudi文件类型说明:

(1)commits: 表示将一批数据原子性写入表中

(2)cleans: 清除表中不在需要的旧版本文件的后台活动

(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中

(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交

(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据


说明:hudi分区文件以及.log和.parquet文件都已生成

两种文件区别:Hudi会在DFS分布式文件系统上的basepath基本路径下组织成目录结构。每张对应的表都会成多个分区,这些分区是包含该分区的数据文件的文件夹,与hive的目录结构非常相似。在每个分区内,文件被组织成文件组,文件id为唯一标识。每个文件组包含多个切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(parquet文件),以及自生成基本文件以来对基本文件的插入/更新的一组日志文件(*.log)。Hudi采用MVCC设计,其中压缩操作会将日志和基本文件合并成新的文件片,清理操作会将未使用/较旧的文件片删除来回收DFS上的空间。

Flink 任务checkpoint 情况:

设置30s 一次



7.22 Hive shell查询数据update和delete变化情况:

hive> select * from mysqlcdc_sync_hive01_ro;


hive> select * from mysqlcdc_sync_hive01_rt;


7.23 Hudi Client端操作Hudi表

进入Hudi客户端命令行:

hudi-master/hudi-cli/hudi-cli.sh

连接Hudi表,查看表信息

hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01


查看Hudi commit信息

hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

查看Hudi compactions 计划

hudi:mysqlcdc_sync_hive01->compactions show all

7.24 PrestoDB 查询Hive表Hudi数据

版本说明:PrestoDB 0.256 DBeaver7.0.4

PrestoDB 集群配置和hive集成参考PrestoDB官网

presto-server-***/etc/catalog/hive.properties 配置hive catalog

可通过 presto-cli 连接 hive metastore 开启查询,presto-cli 的设置参考 presto官方配置;

DBeaver客户端查询Hive ro表数据:


Hive ro表count 正常:


查询Hive rt表数据查询异常:


Hive rt表count异常:


Presto Web ui:



推荐阅读


超详细步骤!整合Apache Hudi + Flink + CDH

Apache Kyuubi + Hudi在 T3 出行的深度实践

Apache Hudi PMC畅谈Hudi未来演进之路

顺丰科技 Hudi on Flink 实时数仓实践

一文彻底弄懂Apache Hudi不同表类型



浏览 115
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报