聊聊Flink:Flink的容错机制

共 5519字,需浏览 12分钟

 ·

2024-05-21 07:30

一、前言

我们上一篇讲了 Flink的状态管理 ,这一篇我们来说一说Flink的容错机制。

在分布式计算系统中,为了保证数据的一致性需要对数据进行一致性快照。

Flink和Spark在做流式计算时,为了保证数据一致性都借鉴了Chandy-Lamport算法原理,Chandy-Lamport算法目标是让多个分布式节点本地数据以及通信中的数据完成local snapshot本地状态保存最终能一起完成global snapshot保存全局状态。只有了解分布式系统为了保证数据一致性的算法背景,才能更好理解Flink如何用Checkpoint来保证数据Exactly Once准确一次语义和何为barrier对齐。

1.1 Chandy-Lamport算法

Chandy-Lamport的“快照”算法描述了决定分布式系统全局状态的“快照”算法。该算法的目的是记录进程集Pi(i=1,2,…,N)的进程状态和通道状态集(快照)。这里的进程集类似Flink JobManager和TaskManager构成分布式架构的进程集。这样,即使所记录的状态组合可能从没有在同一时间发生,但所记录的全局状态还是一致的。Flink TaskManager多任务可异步完成各自的快照,等所有的快照保存完成通知JobManager来最终保证全局状态一致。此算法本身在进程本地记录状态,它没有给出在一个场地收集全局状态的方法。收集状态的一个简单方法是让所有进程把它们记录的状态发送到一个指定的收集进程,如Flink JobManager中CheckPoint Coordinator检查点协调器类似指定的所有进程的状态收集进程。

1.2 容错机制Checkpoint检查点理解

首先状态State与检查点Checkpoint之间关系:Checkpoint将某个时刻应用状态State进行快照Snapshot保存。

  • State:维护/存储的是某一个Operator的运行的状态/历史值,是维护在内存中。

  • Checkpoint:某一时刻,Flink中所有的Operator的当前State的全局快照,一般存在磁盘上。

二、Barrier

Checkpoint是Flink实现容错机制最核心的功能,它能够周期性地基于流中各个算子的状态来生成快照,从而将这些状态数据定期持久化存储下来。当Flink程序一旦意外崩溃时,重新运行程序时可以从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

Flink分布式快照的一个核心元素是流Barrier(屏障或栅栏)。这些Barrier被注入数据流中,并将记录作为数据流的一部分进行流处理。Barrier永远不会超过记录,它们严格地在一条线上流动。Barrier将数据流中的记录隔离成一系列的记录集合,即一个Barrier将数据流中的记录分离为进入当前快照的记录集和进入下一个快照的记录集。

每个Barrier都携带快照的ID,并且Barrier之前的记录都进入了该快照。Barrier不会中断数据流,非常轻量。来自不同快照的多个Barrier可以同时在流中,这意味着多个快照可能同时并发发生。单流Barrier在流中的位置如下图所示。


Barrier在数据流源处被注入并行数据流中。快照n的Barrier被注入的位置(用Sn表示)是快照所包含的数据在数据源中的最大位置。例如,在Apache Kafka中,这个位置将是分区中最后一条记录的偏移量。这个位置Sn会被报告给Checkpoint协调器(Flink的JobManager)。

然后这些Barrier就会顺流而下。当中间算子从所有输入流接收到快照n的Barrier时,它会向所有输出流发出快照n的Barrier。一旦Sink算子(流DAG的末端)从所有输入流接收到Barrier n,它就向Checkpoint协调器确认快照n完成。在所有的Sink都确认了一个快照之后,快照就被认为完成了。

一旦快照n完成,作业将不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)已经通过整个数据流拓扑,即已经被处理完毕。

如下图所示,接收多个输入流的算子需要基于快照Barrier对齐输入流。

一旦算子从一个输入流接收到快照Barrier n,它就不能再处理来自该流的任何记录(新到来的来自该流的其他记录不会被处理,而是输入缓冲区中),直到它从其他输入流接收到Barrier n。否则,它将弄混属于快照n的记录和属于快照n+1的记录。一旦最后一个流接收到Barrier n,算子就会发出所有挂起的(缓冲区中的)向后传送的记录,然后发出快照Barrier n本身。之后,开始恢复处理来自所有输入流的记录,在处理来自流的记录之前,优先处理来自输入缓冲区的记录。最后,算子将状态异步写入状态后端。

三、Checkpoint 执行流程

是不是还是有点抽象?我们结合上面的算法以及Barrier的相关概念画一下Checkpoint总体的执行流程。

再来看下详细版的Checkpoint 执行流程:


如上图所示,Checkpoint在执行过程中,可以简化为可以简化为以下四大步:

四、重启与故障恢复策略

当Task出现故障时,可以对故障的Task以及其他受影响的Task进行重启,以使作业恢复到正常执行状态。Flink通过重启策略和故障恢复策略来控制Task重启,重启策略控制是否可以重启以及重启的时间间隔,故障恢复策略控制哪些Task需要重启。

4.1 重启策略
每个重启策略都有自己的一组配置选项来控制其行为。这些选项可以在Flink的配置文件flink-conf.yaml中设置。restart-strategy用于定义在作业失败时使用的重启策略,可接受的值有:

  • none、off、disable:不重启策略。

  • fixeddelay、fixed-delay:固定延迟重启策略。

  • failurerate、failure-rate:故障率重启策略。

例如,配置固定延迟重启策略(默认使用该策略),代码如下:

restart-strategy: fixed-delay

也可以通过在应用程序中调用StreamExecutionEnvironment对象的setRestartStrategy方法进行设置。当然对于ExecutionEnvironment也同样适用。

如果配置了Checkpoint而没有配置重启策略,那么将默认使用固定延迟重启策略,程序会无限重启,此时最大尝试重启次数由Integer.MAX_VALUE参数决定;如果没有配置Checkpoint,则使用“不重启”策略;如果提交作业时设置了重启策略,该策略将覆盖掉集群的默认策略。

4.2 故障恢复策略

Flink中支持两种不同的故障恢复策略,可以在配置文件flink-conf.yaml中对属性jobmanager.execution.failover-strategy进行设置。该属性有两个值:full和region(默认)。

  • full:Task(任务)发生故障时重启作业中的所有Task进行故障恢复。

  • region:将作业中的Task分为数个不相交的Region(区域)。当有Task发生故障时,将计算进行故障恢复需要重启的最小Region集合。与重启所有Task相比,对于某些作业可能要重启的Task数量更少。

需要重启的最小Region集合的计算逻辑如下:

  • 发生错误的Task所在的Region需要重启,就会重启该Region的所有Task。

  • 如果要重启的Region需要消费的数据有部分无法访问(丢失或损坏),那么产出该部分数据的Region也需要重启。

  • 为了保障数据的一致性,需要重启的Region的下游Region也需要重启。因为对于一些非确定性的计算或者分发会导致同一个结果分区每次产生时包含的数据都不相同。

Region中的Task的数据交换是以Pipelined形式的,而非Batch形式,即Batch形式的Task数据交换不存在Region恢复策略。DataStream和流式Table/SQL作业的所有数据交换都是Pipelined形式的,而批处理式Table/SQL作业的所有数据交换默认都是Batch形式的。

五、Savepoint

Savepoint(保存点)是用户手动触发的Checkpoint,由用户手动创建、拥有和删除,它获取状态的快照并将其写入状态后端。Savepoint主要用于手动的状态数据备份和恢复,常用于在升级和维护集群的过程中保存状态数据,避免系统无法恢复到原有的计算状态。例如,需要升级程序之前先执行一次Savepoint,升级后可以继续从升级前的那个点进行计算,保证数据不中断。

Savepoint底层其实使用的也是Checkpoint机制。Savepoint与Checkpoint的对比如下表所示。


当需要对Flink作业进行停止、重启或者更新时,可以进行一次Savepoint,保存流作业的执行状态。

Flink的Savepoint与Checkpoint的不同之处类似于传统数据库中的备份与恢复日志。Checkpoint的主要目的是为意外失败的作业提供恢复机制。Checkpoint的生命周期由Flink管理,即Flink创建、管理和删除,无须用户交互。Savepoint由用户创建、拥有和删除,是一种有计划的手动备份和恢复。Savepoint的生成、恢复成本可能更高一些,Savepoint更多地关注可移植性和对作业更改的支持。

5.1 触发Savepoint

可以使用命令行客户端来触发Savepoint、触发Savepoint并取消作业、从Savepoint恢复以及删除Savepoint。从Flink 1.2.0开始,还可以使用WebUI从Savepoint恢复。

例如,触发ID为jobId的作业的Savepoint,并返回创建的Savepoint路径(需要此路径来还原和删除Savepoint),代码如下:

$ bin/flink savepoint jobId [targetDirectory]

targetDirectory表示需要创建的用于存储Savepoint数据的目标路径。如果不指定,则使用配置文件中state.savepoints.dir属性指定的默认路径:

state.savepoints.dir: hdfs://centos01:9000/savepoints

目标路径必须是JobManager和TaskManager可访问的位置,例如分布式文件系统上的位置。如果既未设置默认路径又未在执行命令时手动指定目标路径,则触发Savepoint将失败。

如果是基于YARN搭建的集群,则可以使用以下命令来触发Savepoint:

$ bin/flink savepoint jobId [targetDirectory] -yid yarnAppId

参数解析如下:

  • jobId:要触发的作业ID。·

  • targetDirectory:可选的用于存储Savepoint数据的目标路径,如果不指定,则使用配置的默认路径。

  • yarnAppId:YARN的应用程序ID。

5.2 触发Savepoint并取消作业

如果希望触发ID为jobId的作业的Savepoint并取消作业,代码如下:

$ bin/flink cancel -s [targetDirectory] jobId

5.3 通过Savepoint恢复作业

如果希望提交作业,并从Savepoint中恢复作业的状态数据,则可以使用以下代码:

$ bin/flink run -s savepointPath

savepointPath表示Savepoint数据存储的路径。Flink将从该路径中读取已经备份的状态数据。

5.4 删除Savepoint

如果希望删除存储在savepointPath中的Savepoint数据,则可以使用以下代码:

$ bin/flink savepoint -d savepointPath

当然,还可以通过常规文件系统手动删除Savepoint数据,而不会影响其他Savepoint或Checkpoint。



AI数据、云原生、物联网等相关领域的技术知识分享。


浏览 29
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报