Flink SQL 1.11 on Zeppelin集成指南

程序源代码

共 4497字,需浏览 9分钟

 ·

2020-11-06 09:14

点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

6265f40f49fb5e0d262cb3ad11e742f0.webp

6f1dffda8d21badb2fdd1a86563acf96.webp

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

e1dc5f3a69930820a39af9d6f8cc9433.webp

大数据真好玩点击右侧关注,大数据真好玩!

前言

大数据领域SQL化开发的风潮方兴未艾(所谓"Everybody knows SQL"),Flink自然也不能“免俗”。Flink SQL是Flink系统内部最高级别的API,也是流批一体思想的集大成者。用户可以通过简单明了的SQL语句像查表一样执行流任务或批任务,屏蔽了底层DataStream/DataSet API的复杂细节,降低了使用门槛。但是,Flink SQL的默认开发方式是通过Java/Scala API编写,与纯SQL化、平台化的目标相去甚远。目前官方提供的Flink SQL Client仅能在配备Flink客户端的本地使用,局限性很大。而Ververica开源的Flink SQL Gateway组件是基于REST API的,仍然需要二次开发才能供给上层使用,并不是很方便。鉴于有很多企业都无法配备专门的团队来解决Flink SQL平台化的问题,那么到底有没有一个开源的、开箱即用的、功能相对完善的组件呢?答案就是本文的主角——Apache Zeppelin。

Flink SQL on Zeppelin!

eb5a40a64230a57471ae7a0b391451e4.webp

Zeppelin是基于Web的交互式数据分析笔记本,支持SQL、Scala、Python等语言。Zeppelin通过插件化的Interpreter(解释器)来解析用户提交的代码,并将其转化到对应的后端(计算框架、数据库等)执行,灵活性很高。其架构简图如下所示。

178caac0aefbf57aecaf030a7917a251.webp

Flink Interpreter就是Zeppelin原生支持的众多Interpreters之一。只要配置好Flink Interpreter以及相关的执行环境,我们就可以将Zeppelin用作Flink SQL作业的开发平台了(当然,Scala和Python也是没问题的)。接下来本文就逐步介绍Flink on Zeppelin的集成方法。

配置Zeppelin

目前Zeppelin的最新版本是0.9.0-preview2,可以在官网下载包含所有Interpreters的zeppelin-0.9.0-preview2-bin-all.tgz,并解压到服务器的合适位置。接下来进入conf目录。将环境配置文件zeppelin-env.sh.template更名为zeppelin-env.sh,并修改:
# JDK目录
export JAVA_HOME=/opt/jdk1.8.0_172
# 方便之后配置Interpreter on YARN模式。注意必须安装Hadoop,且hadoop必须配置在系统环境变量PATH中
export USE_HADOOP=true
# Hadoop配置文件目录
export HADOOP_CONF_DIR=/etc/hadoop/hadoop-conf
将服务配置文件zeppelin-site.xml.template更名为zeppelin-site.xml,并修改:


zeppelin.server.addr
0.0.0.0
Server binding address




zeppelin.server.port
18080
Server port.
最基础的配置就完成了。运行bin/zeppelin-daemon.sh start命令,返回Zeppelin start [ OK ]的提示之后,访问<服务器地址>:18080,出现下面的页面,就表示Zeppelin服务启动成功。

f9f905375ff5840ce3661de703ab134a.webp

当然,为了一步到位适应生产环境,也可以适当修改zeppelin-site.xml中的以下参数:


zeppelin.notebook.storage
org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo
Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.




zeppelin.notebook.dir
/zeppelin/notebook
path or URI for notebook persist




zeppelin.recovery.storage.class
org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage
ReoveryStorage implementation based on hadoop FileSystem




zeppelin.recovery.dir
/zeppelin/recovery
Location where recovery metadata is stored




zeppelin.anonymous.allowed
true
Anonymous user allowed by default
Zeppelin集成了Shiro实现权限管理。禁止使用匿名用户之后,可以在conf目录下的shiro.ini中配置用户名、密码、角色等,不再赘述。注意每次修改配置都需要运行bin/zeppelin-daemon.sh restart重启Zeppelin服务。

配置Flink Interpreter on YARN

在使用Flink Interpreter之前,我们有必要对它进行配置,使Flink作业和Interpreter本身在YARN环境中运行。点击首页用户名区域菜单中的Interpreter项(上一节图中已经示出),搜索flink,就可以看到参数列表。

1be396e3d4880e088d46cb640e648df6.webp

Interpreter Binding

首先,将Interpreter Binding模式修改为Isolated per Note,如下图所示。

5e486699e8208be394f13d0b0d2b280b.webp

在这种模式下,每个Note在执行时会分别启动Interpreter进程,类似于Flink on YARN的Per-job模式,最符合生产环境的需要。

Flink on YARN参数

以下是需要修改的部分基础参数。注意这些参数也可以在Note中指定,每个作业自己的配置会覆盖掉这里的默认配置。
  • FLINK_HOME:Flink 1.11所在的目录;

  • HADOOP_CONF_DIR:Hadoop配置文件所在的目录;

  • flink.execution.mode:Flink作业的执行模式,指定为yarn以启用Flink on YARN;

  • flink.jm.memory:JobManager的内存量(MB);

  • flink.tm.memory:TaskManager的内存量(MB);

  • flink.tm.slot:TaskManager的Slot数;

  • flink.yarn.appName:YARN Application的默认名称;

  • flink.yarn.queue:提交作业的默认YARN队列。

Hive Integration参数

如果我们想访问Hive数据,以及用HiveCatalog管理Flink SQL的元数据,还需要配置与Hive的集成。
  • HIVE_CONF_DIR:Hive配置文件(hive-site.xml)所在的目录;

  • zeppelin.flink.enableHive:设为true以启用Hive Integration;

  • zeppelin.flink.hive.version:Hive版本号。

  • 复制与Hive Integration相关的依赖到$FLINK_HOME/lib目录下,包括:

      flink-connector-hive_2.11-1.11.0.jar
    flink-hadoop-compatibility_2.11-1.11.0.jar
    hive-exec-*.*.jar
    如果Hive版本是1.x,还需要额外加入hive-metastore-1.*.jar、libfb303-0.9.2.jar和libthrift-0.9.2.jar
  • 保证Hive元数据服务(Metastore)启动。注意不能是Embedded模式,即必须以外部数据库(MySQL、Postgres等)作为元数据存储。

Interpreter on YARN参数

在默认情况下,Interpreter进程是在部署Zeppelin服务的节点上启动的。随着提交的任务越来越多,就会出现单点问题。因此我们需要让Interpreter也在YARN上运行,如下图所示。

9c6c99b0a8adadbcbafa26ad14e55ae5.webp

  • zeppelin.interpreter.yarn.resource.cores:Interpreter Container占用的vCore数量

  • zeppelin.interpreter.yarn.resource.memory:Interpreter Container占用的内存量(MB)

  • zeppelin.interpreter.yarn.queue:Interpreter所处的YARN队列名称

配置完成之后,Flink on Zeppelin集成完毕,可以测试一下了。

测试Flink SQL on Zeppelin

创建一个Note,Interpreter指定为flink。然后写入第一个Paragraph:

0c931a90b76090b607a10b84d5d86b31.webp

以%flink.conf标记的Paragraph用于指定这个Note中的作业配置,支持Flink的所有配置参数(参见Flink官网)。另外,flink.execution.packages参数支持以Maven GAV坐标的方式引入外部依赖项。接下来创建第二个Paragraph,创建Kafka流表:

fbb2d5000e894c6955dab5659153bdb7.webp

%flink.ssql表示利用StreamTableEnvironment执行流处理SQL,相对地,%flink.bsql表示利用BatchTableEnvironment执行批处理SQL。注意表参数中的properties.bootstrap.servers利用了Zeppelin Credentials来填写,方便不同作业之间复用。执行上述SQL之后会输出信息:

c9196fe32dd7a6fce5f7bb329dfda252.webp

同时在Hive中可以看到该表的元数据。最后写第三个Paragraph,从流表中查询,并实时展现出来:

933eb7d7ae0b6bb64057b59dbd13d1eb.webp

点击右上角的FLINK JOB标记,可以打开作业的Web UI。上述作业的JobGraph如下。

da04a3fe3be19e63d98fa822cb43b6f5.webp

除SELECT查询外,通过Zeppelin也可以执行INSERT查询,实现更加丰富的功能。关于Flink SQL on Zeppelin的更多应用,笔者在今后的文章中会继续讲解,今天就到这里吧。039039ab538d40fe5482243dce15de56.webpa9b2b20af2a471227fe53b538264d6b8.webp

版权声明:

本文为《大数据技术与架构》整理,原作者独家授权。未经原作者允许转载追究侵权责任。编辑|冷眼丶微信公众号|大数据技术与架构

文章不错?点个【在看】吧! ?

浏览 6
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报