Flink 特性 | Flink 1.12 资源管理新特性回顾
共 12635字,需浏览 26分钟
·
2021-07-17 14:14
内存管理 资源调度 扩展资源框架 未来规划 总结
一、内存管理
1. 本地内存(Managed Memory)
一方面是 slot 级别的预算规划,它可以保证作业运行过程中不会因为内存不足,造成某些算子或者任务无法运行;也不会因为预留了过多的内存没有使用造成资源浪费。同时 Flink 能保证当任务运行结束时准确将内存释放,确保 Task Manager 执行新任务时有足够的内存可用。
另一方面,资源适应性也是托管内存很重要的特性之一,指算子对于内存的需求是动态可调整的。具备了适应性,算子就不会因为给予任务过多的内存造成资源使用上的浪费,也不会因为提供的内存相对较少导致整个作业无法运行,使内存的运用保持在一定的合理范围内。
当然,在内存分配相对比较少情况下,作业会受到一定限制,例如需要通过频繁的落盘保证作业的运行,这样可能会影响性能。
RocksDB 状态后端:在流计算的场景中,每个 Slot 会使用 State 的 Operator,从而共享同一底层 的 RocksDB 缓存; Flink 内置算子:包含批处理、Table SQL、DataSet API 等算子,每个算子有独立的资源预算,不会相互共享; Python 进程:用户使用 PyFlink,使用 Python 语言定义 UDF 时需要启动 Python 的虚拟机进程。
2. Job Graph 编译阶段
■ 2.1 作业的 Job Graph 编译阶段
第一个问题是:slot 当中到底有哪些算子或者任务会同时执行。这个问题关系到在一个查询作业中如何对内存进行规划,是否还有其他的任务需要使用 management memory,从而把相应的内存留出来。在流式的作业中,这个问题是比较简单的,因为我们需要所有的算子同时执行,才能保证上游产出的数据能被下游及时的消费掉,这个数据才能够在整个 job grep 当中流动起来。但是如果我们是在批处理的一些场景当中,实际上我们会存在两种数据 shuffle 的模式。
一种是 pipeline 的模式,这种模式跟流式是一样的,也就是我们前面说到的 bounded stream 处理方式,同样需要上游和下游的算子同时运行,上游随时产出,下游随时消费。
另外一种是所谓的 batch 的 blocking的方式,它要求上游把数据全部产出,并且落盘结束之后,下游才能开始读数据。
这两种模式会影响到哪些任务可以同时执行。目前在 Flink 当中,根据作业拓扑图中的一个边的类型 (如图上)。我们划分出了定义的一个概念叫做 pipelined region,也就是全部都由 pipeline 的边锁连通起来的一个子图,我们把这个子图识别出来,用来判断哪些 task 会同时执行。
第二个问题是:slot 当中到底有哪些使用场景?我们刚才介绍了三种 manage memory 的使用场景。在这个阶段,对于流式作业,可能会出现 Python UDF 以及 Stateful Operator。这个阶段当中我们需要注意的是,这里并不能肯定 State Operator 一定会用到 management memory,因为这跟它的状态类型是相关的。
如果它使用了 RocksDB State Operator,是需要使用 manage memory 的;
但是如果它使用的是 Heap State Backend,则并不需要。
第三个问题:对于 batch 的作业,我们除了需要清楚有哪些使用场景,还需要清楚一件事情,就是前面提到过 batch 的 operator。它使用 management memory 是以一种算子独享的方式,而不是以 slot 为单位去进行共享。我们需要知道不同的算子应该分别分配多少内存,这个事情目前是由 Flink 的计划作业来自动进行设置的。
然而,作业在编译的阶段,其实并不知道状态的类型,这里是需要去注意的地方。
■ 2.2 执行阶段
其中一个是 RocksDB State Backend,有了第一步的判断之后,第二步我们会根据用户的配置,去决定不同使用方式之间怎么样去共享 slot 的 management memory。
在这个 Steaming 的例子当中,我们定义的 Python 的权重是 30%,State Backend 的权重是 70%。在这样的情况下,如果只有 Python,Python 的部分自然是使用 100% 的内存(Streaming 的 Heap State Backend 分支);
而对于第二种情况(Streaming 的 RocksDB State Backend 分支),B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF,另外 C 再独享 70% 的内存用于 RocksDB State Backend。最后 Flink 会根据 Task manager 的资源配置,一个 slot 当中有多少 manager memory 来决定每个 operator 实际可以用的内存的数量。
3. 参数配置
配置参数 | 默认值 | 备注 | |
---|---|---|---|
大小 | taskmanager.memory.managed.size | / | 绝对大小 |
权重 | taskmanager.memory.managed.fraction | 0.4 | 相对大小(占用Flink)总内存比例 |
taskmanager.memory.managed.consumer-weight | DATAPROC:70,PYTHON:30 | 多种用途并存时候分配权重 |
一种是绝对值的配置方式;
还有一种是作为 Task Manager 总内存的一个相对值的配置方式。
taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说我们在这里面实际上是给了一个 key 冒号 value,然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key:
一个是 DATAPROC, DATAPROC 既包含了流处理当中的状态后端 State Backend 的内存,也包含了批处理当中的 Batch Operator;
另外一种是 Python。
二、 资源调度
部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的,这里我们也做对应的介绍。
1. 最大 Slot 数
2. TaskManager 容错
3. 任务平铺分布
第一,这个参数我们只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是根据你作业的需求来决定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的调度需求。
在每次调度任务的时候,实际上只能看到当前注册上来的那一个 TaskManager,Flink 没办法全局的知道后面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么特性打开了之后好像并没有起到一个很好的效果。
第二个需要注意的点是,这里面我们只能决定每一个 TaskManager 上有多少空闲 slot,然而并不能够决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是完全看不到 task 的。
三、扩展资源框架
1. 背景
一个是 Flink AI Extended 的项目,是基于 Flink 的深度学习扩展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用户可以将 TensorFlow 当做一个算子,放在 Flink 任务中。
另一个是 Alink,它是一个基于 Flink 的通用算法平台,里面也内置了很多常用的机器学习算法。
2. 使用扩展资源
需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求,如每个 TaskManager 上需要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,需要将用户对扩展资源的需求进行转发,以保证申请到的 Container/Pod 中存在对应的扩展资源。
需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源,以 GPU 为例,算子需要知道它内部的模型可以部署在那一块 GPU 卡上,因此,需要向算子提供这些信息。
3. 扩展资源框架使用方法
首先为该扩展资源设置相关配置;
然后为所需的扩展资源准备扩展资源框架中的插件;
最后在算子中,从 RuntimeContext 来获取扩展资源的信息并使用这些资源。
■ 3.1 配置参数
# 定义扩展资源名称,“gpu”
external-resources: gpu
# 定义每个 TaskManager 所需的 GPU 数量
external-resource.gpu.amount: 1
# 定义Yarn或Kubernetes中扩展资源的配置键
external-resource.gpu.yarn.config-key: yarn.io/gpu
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# 定义插件 GPUDriver 的工厂类。
external-resource.gpu.driver-factory.class:
org.apache.flink.externalresource.gpu.GPUDriverFactory
对于任何扩展资源,用户首先需要将它的名称加入 "external-resources" 中,这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中,我们定义了一种名为 "gpu" 的资源。
在调度层,目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中,我们定义每个 TaskManager 上的 GPU 设备数为 1。
将 Flink 部署在 Kubernetes 或是 Yarn 上时,我们需要配置扩展资源在对应的资源底座上的配置键,以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。
如果提供了插件,则需要将插件的工厂类名放入配置中。
■ 3.2 前置准备
在 Standalone 模式下,集群管理员需要保证 GPU 资源对 TaskManager 进程可见; 在 Kubernetes 模式下,需要集群支持 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中安装了 GPU 对应的插件; 在 Yarn 模式下,GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。
■ 3.3 扩展资源框架插件
public interface ExternalResourceDriverFactory {
/**
* 根据提供的设置创建扩展资源的Driver
*/
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}
public interface ExternalResourceDriver {
/**
* 获取所需数量的扩展资源信息
*/
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}
4. GPU 插件
当调用脚本时,所需要的 GPU 数量将作为第一个参数输入,之后为用户自定义参数列表; 若脚本执行正常,则输出 GPU Index 列表,以逗号分隔; 若脚本出错或执行结果不符合预期,则脚本以非零值退出,这会导致 TaskManager 初始化失败,并在日志中打印脚本的错误信息。
5. 在算子中获取扩展资源信息
public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
private static finalRESOURCE_NAME="gpu";
public String map(String value) {
Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
List<String> indexes = gpuInfos.stream()
.map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList());
// Map function with GPU// ...
}
}
6. MNIST Demo
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {
public void open(Configuration parameters) {
//获取GPU信息并且选择第一块GPU
Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName);
final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");
// 使用第一块GPU的index初始化JCUDA组件
JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));
JCublas.cublasInit();
}
}
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {
public Integer map(List<Float> value) {
// 使用Jucblas做矩阵算法
JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,
matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);
// 获得乘法结果并得出该图所表示的数字
JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);
JCublas.cublasFree(inputPointer);
JCublas.cublasFree(outputPointer);
int result = 0;
for (int i = 0; i < DIMENSIONS.f1; ++i) {
result = output[i] > output[result] ? i : result;
}
return result;
}
}
四、未来计划
除了上文介绍的这些已经发布的特性外,Apache Flink 社区也正在积极准备更多资源管理方面的优化特性,在未来的版本中将陆续和大家见面。
被动资源调度模式:托管内存使得 Flink 任务可以灵活地适配不同的 TaskManager/Slot 资源,充分利用可用资源,为计算任务提供给定资源限制下的最佳算力。但用户仍需指定计算任务的并行度,Flink 需要申请到满足该并行度数量的 TaskManager/Slot 才能顺利执行。被动资源调度将使 Flink 能够根据可用资源动态改变并行度,在资源不足时能够 best effort 进行数据处理,同时在资源充足时恢复到指定的并行度保障处理性能。 细粒度资源管理:Flink 目前基于 Slot 的资源管理与调度机制,认为所有的 Slot 都具有相同的规格。对于一些复杂的规模化生产任务,往往需要将计算任务拆分成多个子图,每个子图单独使用一个 Slot 执行。当子图间的资源需求差异较大时,使用相同规格的 Slot 往往难以满足资源效率方面的需求,特别是对于 GPU 这类成本较高的扩展资源。细粒度资源管理允许用户为作业的子图指定资源需求,Flink 会根据资源需求使用不同规格的 TaskManager/Slot 执行计算任务,从而优化资源效率。
五、总结
首先从本地内存、Job Graph 编译阶段、执行阶段来解答每个流程的内存管理以及内存分配细节,通过新的参数配置控制 TaskManager的内存分配; 然后从大家平时遇到资源调度相关问题,包括最大 Slot 数使用,如何进行 TaskManager 进行容错,任务如何通过任务平铺均摊任务资源; 最后在机器学习和深度学习领域常常用到 GPU 进行加速计算,通过解释 Flink 在 1.12 版本如何使用扩展资源框架和演示 Demo, 给我们展示了资源扩展的使用。再针对资源利用率方面提出 2 个社区未来正在做的计划,包括被动资源模式和细粒度的资源管理。
六、附录