Flink 实践 | 作业帮基于 Flink 的实时计算平台实践
摘要:本文整理自作业帮实时计算负责人张迎在 Flink Forward Asia 2021 的分享。在作业帮实时计算演进过程中,Flink 起到了重要的作用,特别是借助于 FlinkSQL 极大的提高了实时任务的开发效率。这篇文章主要分享 FlinkSQL 在作业帮的使用情况、实践经验,以及随着任务规模增长,在从 0 到 1 搭建实时计算平台的过程中遇到的问题及解决方案。内容包括:
发展历程 Flink SQL 应用实践 平台建设 总结展望
一、发展历程
作业帮主要运用人工智能、大数据等技术,为学生提供更高效的学习解决方案。因此业务上的数据,主要是学生的到课情况、知识点掌握的情况这些。整体架构上,无论是 binlog 还是普通日志,经过采集后写入 Kafka,分别由实时和离线计算写入存储层,基于 OLAP 再对外提供对应的产品化服务,比如工作台、BI 分析工具。
作业帮的实时计算目前基本以 Flink 为主,发展历程大概有三个阶段:
19 年,实时计算包含少量的 SparkStreaming 作业,提供到辅导老师、主讲侧。在解决实时需求的过程中,就会发现开发效率很低,数据几乎无法复用; 之后常规的做法,是在生产实践中逐步应用 Flink JAR,积累经验后开始搭建平台以及应用 Flink SQL。不过在 20 年,业务提出了非常多的实时计算需求,而我们开发人力储备不足。当时 Flink SQL 1.9 发布不久,SQL 功能变化较大,所以我们的做法是直接在实时数仓方向应用 Flink SQL,目前整个实时数仓超过 90% 的任务都是使用 Flink SQL 实现的; 到了 20 年 11 月份,Flink 作业很快增加到几百条,我们开始从 0 到 1 搭建实时计算平台,已经支持了公司全部重要的业务线,计算部署在多个云的多个集群上。
接下来介绍两个方面:
FlinkSQL 实践遇到的典型问题以及解决方案; 实时计算平台建设过程中的一些思考。
二、Flink SQL 应用实践
同时,考虑实际应用时,也需要在元数据表的基础上,能够对表属性进行新增或者替换:
新增:元数据记录的是表级别的属性,但是 SQL 作业里可能需要增加任务级别的属性。比如对于 Kafka 源表,增加作业的 group.id 来记录 offset; 替换:线下测试时,在引用元数据表的基础上,只需要定义 broker topic 等属性覆盖源表,这样可以快速的构建一个线下测试表。
2.1 SQL 增加 Trace 功能
注:我们当时是基于 1.9 开发的,这里为了讲述清楚,也使用了一些后来加入的 feature https://issues.apache.org/jira/browse/FLINK-16361 https://issues.apache.org/jira/browse/FLINK-18840
从上图下方的实际 DAG 看不太符合预期:
DAG 被分成了上下不相关的两部分,Kafka 源表也就是DataSource部分,读取了两次; foo 方法调用了三次。
2.2 Table 的选型及设计
我们的解决方案是基于 Redis 实现,首先有几点好处:
高 qps、低延迟:这个应该是所有实时计算都关注的; TTL:用户不用关心数据如何退场,给定一个合理的 TTL 就可以了; 通过使用 protobuf 等高性能且紧凑的序列化方式,以及使用 TTL,存储上整体不到 200G,redis 的内存压力可以接受; 贴合计算模型:计算本身为了确保时序性,会进行 keyBy 的操作,把需要同时处理的数据 shuffle 到同一并发上,因此也不依赖存储过多考虑锁的优化。
上图举了一个学生在某个章节是否到课的表的例子:
多索引:数据首先按照 string 格式存储,比如 key=(uid, lesson_id), value=serialize(is_attend, ...),这样我们就可以在 SQL 里 JOIN ON uid AND lesson_id 了。如果 JOIN ON 其他字段,比如 lesson_id 怎么办?我们的做法,是会同时写入一个 lesson_id 为 key 的 set,set 里的元素是对应的 (uid, lesson_id)。接下来查找 lesson_id = 123 时,先取出该 set 下所有元素,然后再通过 pipeline 的方式查找到所有的 VALUE 返回; 触发消息:写入 redis 后,会同时写入一条更新消息到 Kafka. 两个存储之间的一致性、顺序性、不丢数据都在 Redis Connector 的实现里保证。
DDL 里几个比较重要的属性:
primary 定义了主键,对应 string 的数据结构,例如例子里的 uid + lesson_id; index.fields 定义了辅助查找的索引字段,例如例子里的 lesson_id;索引也可以定义多个; poster.kafka 定义接收触发消息的 kafka 表,这个表同样定义在了元数据,用户可以在后续的 SQL 作业里无需定义直接读取该表。
三、平台建设
平台支持的功能,出发点主要有 3 个:
统一:统一不同云厂商不同的集群环境、Flink 版本、提交方式等;之前 hadoop 客户端散落在用户的提交机上,对集群数据、任务安全都有隐患,同时增加了集群后续的升级、迁移成本。我们希望通过平台统一任务的提交入口以及提交方式; 易用:通过平台交互能够提供更多易用功能,比如调试、语义检测,这些都能提高任务测试的人效,以及记录任务的版本历史支持方便的上线及回滚操作; 规范:权限控制、流程审批等,类似于在线服务的上线流程,通过平台,能够把实时任务的研发流程规范起来。
3.1 规范 - 实时任务流程管理
但是规范还是要执行,有些问题类似在线服务,实时计算里也会遇到:
记不清:任务在线上跑了一年,最初的需求可能是口口相传,好一点记了 wiki 或者邮件,但是都容易在任务交接中记不清楚; 不规范:UDF 也好,DataStream 的代码也好,都没有遵守规范,可读性差,导致后面接手的同学升级改不动、或者不敢改,没法长久的维护下去。包括实时任务的 SQL 怎么写也应该有规范; 找不到:线上运行中的任务,依赖了某个 jar,对应的是哪个 git 模块的哪个 commitId,出了问题怎么第一时间找到对应的代码实现; 瞎修改:一直正常的任务,周末突然报警了,原因是私自修改了线上任务的 SQL。
规范主要分为三部分:
开发:RD 可以从 UDF archetype 项目上快速创建一个 UDF 模块,这个是参考了 flink quickstart。创建出来的 UDF 模块可以正常编译,包含了类似 WordCount 这种 udf 示例,也有默认的 ReadMe、VersionHelper 这些辅助方法。按照业务需求修改后,经过 CR 上传到 Git; 需求管理、编译:提交的代码会关联到需求卡片上,经过集群编译、QA测试,才能发单上线; 上线:根据模块及编译产出,选择更新/创建哪些作业,经过作业 owner 或者 leader 审批后,重新部署。
3.2 易用 - 监控
我们目前的 Flink 作业都运行在 Yarn 上。作业启动后,预期是 Prometheus 来抓取 Yarn 分配的 Container,然后对接报警系统,用户就可以基于报警系统配置 Kafka 延迟、Checkpoint 失败这些报警。在搭建这条通路时主要遇到了两个问题:
PrometheusReporter 启动 HTTPServer 后,Prometheus 怎么能动态感知;也需要能够控制 metric 的大小,避免采集大量无用数据; 我们 SQL 的源表,基本是以 Kafka 为主。相比第三方的工具,在计算平台上配置 Kafka 延迟报警会更加方便。因为能够天然的拿到任务读取的 topic、group.id,同时也可以跟任务失败使用同一个报警组。再配合上报警模板,配置报警非常简便。
在官方 PrometheusReporter 的基础上增加了 discovery 的功能。Container 的 HTTPServer 启动后,把对应的 ip:port 以临时节点的形式注册到 zk 上,然后利用 Prometheus 的 discover targets 监听 zk 节点的变化。由于是临时节点,Container 销毁时节点消失,Prometheus 也能够感知不再抓取。这样就很简便的搭建起来 Prometheus 抓取的通路。 KafkaConsumer.records-lag 是比较实用、重要的延迟指标,主要做了两个工作。修改 KafkaConnector,在 KafkaConsumer.poll 之后再 expose 出来,确保 records-lag 指标可见。另外在做这个的过程中,发现不同 Kafka 版本的这个指标格式不同(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74686649),我们的做法是都打平为一种格式,注册到 flink 的 metrics 里。这样不同版本暴露出来的指标是一致的。
四、总结展望
下一步规划主要分为三部分:
支持资源弹性伸缩,平衡实时作业的成本以及时效性; 我们是从 1.9 开始大规模应用 Flink SQL 的,现在版本升级变化很大,需要考虑如何让业务能够低成本的升级使用新版本里 feature; 探索流批一体在实际业务场景上的落地。
往期精选
戳我,查看原文视频~
评论