内存管理
资源调度扩展
资源框架
未来规划
一、内存管理
首先回顾 Flink 的内存模型变迁。下图左边分别为 Flink 1.10、Flink 1.11 引入的新的内存模型。尽管涉及的模块较多,但 80% - 90% 的用户仅需关注真正用于任务执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四部分。
其它模块大部分是 Flink 的框架内存,正常不需要调整,即使遇到问题也可以通过社区文档来解决。除此之外,“一个作业究竟需要多少内存才能满足实际生产需求” 也是大家不得不面临的问题,比如其他指标的功能使用、作业是否因为内存不足影响了性能,是否存在资源浪费等。
针对上述内容,社区在 Flink 1.12 版本提供了一个全新的, 关于 Task manager 和 Job
manager 的 Web UI。
在新的 Web UI 中,可以直接将每一项监控指标配置值、实际使用情况对应到内存模型中进行直观的展示。在此基础上,可以更清楚的了解到作业的运行情况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供支持)。通过新的 Web UI,大家能更好的了解作业的使用情况,内存管理也更方便。
1. 本地内存(Managed Memory)
Flink 托管内存实际上是 Flink 特有的一种本地内存,不受 JVM 和 GC 的管理,而是由 Flink 自行进行管理。
本地内存的特点主要体现在两方面:
一方面是 slot 级别的预算规划,它可以保证作业运行过程中不会因为内存不足,造成某些算子或者任务无法运行;也不会因为预留了过多的内存没有使用造成资源浪费。 同时 Flink 能保证当任务运行结束时准确将内存释放,确保 Task Manager 执行新任务时有足够的内存可用。
另一方面,资源适应性也是托管内存很重要的特性之一,指算子对于内存的需求是动态可调整的。具备了适应性,算子就不会因为给予任务过多的内存造成资源使用上的浪费,也不会因为提供的内存相对较少导致整个作业无法运行,使内存的运用保持在一定的合理范围内。当然,在内存分配相对比较少情况下,作业会受到一定限制,例如需要通过频繁的落盘保证作业的运行,这样可能会影响性能。
当前,针对托管内存,Flink 的使用场景如下:
RocksDB 状态后端:在流计算的场景中,每个 Slot 会使用 State 的 Operator,从而共享同一底层 的 RocksDB 缓存;
Flink 内置算子:包含批处理、Table SQL、DataSet API 等算子,每个算子有独立的资源预算,不会相互共享;
Python 进程:用户使用 PyFlink,使用 Python 语言定义 UDF 时需要启动 Python 的虚拟机进程。
2. Job Graph 编译阶段
Flink 对于 management memory 的管理主要分为两个阶段。
2.1 作业的 Job Graph 编译阶段
在这个阶段需要注意三个问题:
第一个问题是:slot 当中到底有哪些算子或者任务会同时执行。这个问题关系到在一个查询作业中如何对内存进行规划,是否还有其他的任务需要使用 management memory,从而把相应的内存留出来。 在流式的作业中,这个问题是比较简单的,因为我们需要所有的算子同时执行,才能保证上游产出的数据能被下游及时的消费掉,这个数据才能够在整个 job grep 当中流动起来。 但是如果我们是在批处理的一些场景当中,实际上我们会存在两种数据 shuffle 的模式,一种是 pipeline 的模式,这种模式跟流式是一样的,也就是我们前面说到的 bounded stream 处理方式,同样需要上游和下游的算子同时运行,上游随时产出,下游随时消费。另外一种是所谓的 batch 的 blocking的方式,它要求上游把数据全部产出,并且落盘结束之后,下游才能开始读数据。这两种模式会影响到哪些任务可以同时执行。目前在 Flink 当中,根据作业拓扑图中的一个边的类型 (如图上)。我们划分出了定义的一个概念叫做 pipelined region,也就是全部都由 pipeline 的边锁连通起来的一个子图,我们把这个子图识别出来,用来判断哪些 task 会同时执行。
第二个问题是:slot 当中到底有哪些使用场景?我们刚才介绍了三种 manage memory 的使用场景。在这个阶段,对于流式作业,可能会出现 Python UDF 以及 Stateful Operator。这个阶段当中我们需要注意的是,这里并不能肯定 State Operator 一定会用到 management memory,因为这跟它的状态类型是相关的。如果它使用了 RocksDB State Operator,是需要使用 manage memory 的;但是如果它使用的是 Heap State Backend,则并不需要。然而,作业在编译的阶段,其实并不知道状态的类型,这里是需要去注意的地方。
第三个问题:对于 batch 的作业,我们除了需要清楚有哪些使用场景,还需要清楚一件事情,就是前面提到过 batch 的 operator。它使用 management memory 是以一种算子独享的方式,而不是以 slot 为单位去进行共享。我们需要知道不同的算子应该分别分配多少内存,这个事情目前是由 Flink 的计划作业来自动进行设置的。
2.2 执行阶段
第一个步骤是根据 State Backend 的类型去判断是否有 RocksDB。如上图所示,比如一个 slot,有 ABC 三个算子,B 跟 C 都用到了 Python,C 还用到了 Stateful 的 Operator。这种情况下,如果是在 heap 的情况下,我们走上面的分支,整个 slot 当中只有一种在使用,就是Python。之后会存在两种使用方式:
其中一个是 RocksDB State Backend,有了第一步的判断之后,第二步我们会根据用户的配置,去决定不同使用方式之间怎么样去共享 slot 的 management memory。
在这个 Steaming 的例子当中,我们定义的 Python 的权重是 30%,State Backend 的权重是 70%。在这样的情况下,如果只有 Python,Python 的部分自然是使用 100% 的内存(Streaming 的 Heap State Backend 分支);
而对于第二种情况(Streaming 的 RocksDB State Backend 分支),B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF,另外 C 再独享 70% 的内存用于 RocksDB State Backend。最后 Flink 会根据 Task manager 的资源配置,一个 slot 当中有多少 manager memory 来决定每个 operator 实际可以用的内存的数量。
批处理的情况跟流的情况有两个不同的地方,首先它不需要去判断 State Backend 的类型,这是一个简化; 其次对于 batch 的算子,上文提到每一个算子有自己独享的资源的预算,这种情况下我们会去根据使用率算出不同的使用场景需要多少的 Shared 之后,还要把比例进一步的细分到每个 Operator。
3. 参数配置
上方图表展示了我们需要的是 manager,memory 大小有两种配置方式:
一种是绝对值的配置方式,
还有一种是作为 Task Manager 总内存的一个相对值的配置方式。
taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说我们在这里面实际上是给了一个 key 冒号 value,然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key:
一个是 DATAPROC, DATAPROC 既包含了流处理当中的状态后端 State Backend 的内存,也包含了批处理当中的 Batch Operator;
另外一种是 Python。
二、 资源调度
部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的,这里我们也做对应的介绍。
1. 最大 Slot 数
Flink 在 1.12 支持了最大 slot 数的一个限制(slotmanager.number-of-slots.max),在之前我们也有提到过对于流式作业我们要求所有的 operator 同时执行起来,才能够保证数据的顺畅的运行。在这种情况下,作业的并发度决定了我们的任务需要多少个 slot 和资源去执行作业。
然而对于批处理其实并不是这样的,批处理作业往往可以有一个很大的并发度,但实际并不需要这么多的资源,批处理用很少的资源,跑完前面的任务腾出 Slot 给后续的任务使用。通过这种串行的方式去执行任务能避免 YARN/K8s 集群的资源过多的占用。目前这个参数支持在 yarn/mesos/native k8 使用。
2. TaskManager 容错
在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接,甚至直接挂掉。我们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种情况需要进行作业重启,在重启的过程中需要重新申请资源和重启 TaskManager 进程,这种性能消耗代价是非常高昂的。
对于稳定性要求相对比较高的作业,Flink1.12 提供了一个新的 feature,能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager,这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复,而不需要等待一个重新的资源申请的过程。
通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于我所需要的总共的资源数量,会多出两个 TaskManager。
任务可能是相对比较均匀的分布在上面,在能够在利用空闲 TaskManager 的同时,也能够达到一个相对比较好的负载。 一旦发生故障的时候,可以去先把任务快速的调度到现有的还存活的 TaskManager 当中,然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。
3. 任务平铺分布
任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能部分 manager 放的任务很满,有的则放的比较松散。
在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数能够控制它,去进行一个相对比较均衡的调度。
注意:
第一,这个参数我们只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是根据你作业的需求来决定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的调度需求。在每次调度任务的时候,实际上只能看到当前注册上来的那一个 TaskManager,Flink 没办法全局的知道后面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么特性打开了之后好像并没有起到一个很好的效果,这是第一件事情。
第二个需要注意的点是,这里面我们只能决定每一个 TaskManager 上有多少空闲 slot,然而并不能够决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是完全看不到 task 的。
三、扩展资源框架
1. 背景
近年来,随着人工智能领域的不断发展,深度学习模型已经被应用到了各种各样的生产需求中,比较典型的场景如推荐系统,广告推送,智能风险控制。这些也是 Flink 一直以来被广泛使用的场景,因此,支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标,目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个:
一个是 Flink AI Extended 的项目,是基于 Flink 的深度学习扩展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用户可以将 TensorFlow 当做一个算子,放在 Flink 任务中。
另一个是 Alink,它是一个基于 Flink 的通用算法平台,里面也内置了很多常用的机器学习算法。
以上的两个工作都是从功能性上对 Flink 进行扩展,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此,支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。
2. 使用扩展资源
目前 Flink 支持用户配置的资源维度只有 CPU 与内存,而在实际使用中,不仅是 GPU,我们还会遇到其他资源需求,如 SSD 或 RDMA 等网络加速设备。因此,我们希望提供一个通用的扩展资源框架,任何扩展资源都可以以插件的形式来加入这个框架,GPU 只是其中的一种扩展资源。
对于扩展资源的使用,可以抽象出两个通用需求:
需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求,如每个 TaskManager 上需要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,需要将用户对扩展资源的需求进行转发,以保证申请到的 Container/Pod 中存在对应的扩展资源。
需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源,以 GPU 为例,算子需要知道它内部的模型可以部署在那一块 GPU 卡上,因此,需要向算子提供这些信息。
3. 扩展资源框架使用方法
使用资源框架我们可以分为以下这 3 个步骤:
首先为该扩展资源设置相关配置;
然后为所需的扩展资源准备扩展资源框架中的插件;
最后在算子中,从 RuntimeContext 来获取扩展资源的信息并使用这些资源
3.1 配置参数
- # 定义扩展资源名称,“gpu”external-resources: gpu# 定义每个 TaskManager 所需的 GPU 数量external-resource.gpu.amount: 1 # 定义Yarn或Kubernetes中扩展资源的配置键external-resource.gpu.yarn.config-key: yarn.io/gpuexternal-resource.gpu.kubernetes.config-key: nvidia.com/gpu# 定义插件 GPUDriver 的工厂类。external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
以上是使用 GPU 资源的配置示例:
对于任何扩展资源,用户首先需要将它的名称加入 "external-resources" 中,这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中,我们定义了一种名为 "gpu" 的资源。
在调度层,目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中,我们定义每个 TaskManager 上的 GPU 设备数为 1。
将 Flink 部署在 Kubernetes 或是 Yarn 上时,我们需要配置扩展资源在对应的资源底座上的配置键,以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。
如果提供了插件,则需要将插件的工厂类名放入配置中。
3.2 前置准备
在实际使用扩展资源前,还需要做一些前置准备工作,以 GPU 为例:
在 Standalone 模式下,集群管理员需要保证 GPU 资源对 TaskManager 进程可见。
在 Kubernetes 模式下,需要集群支持 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中安装了 GPU 对应的插件。
在 Yarn 模式下,GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。
3.3 扩展资源框架插件
完成了对扩展资源的调度后,用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取,它的接口如下:
- public interface ExternalResourceDriverFactory { ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;}public interface ExternalResourceDriver { Set extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;}
ExternalResourceDriver 会在各个 TaskManager 上启动,扩展资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来获得 TaskManager 上的扩展资源信息,并将得到的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。
4. GPU 插件
Flink 目前内置了针对 GPU 资源的插件,其内部通过执行名为 Discovery Script 的脚本来获取当前环境可用的 GPU 信息,目前信息中包含了 GPU 设备的 Index。
Flink 提供了一个默认脚本,位于项目的 "plugins/external-resource-gpu/" 目录,用户也可以实现自定义的 Discovery Script 并通过配置来指定使用自定义脚本。该脚本与 GPU 插件的协议为:
当调用脚本时,所需要的 GPU 数量将作为第一个参数输入,之后为用户自定义参数列表。
若脚本执行正常,则输出 GPU Index 列表,以逗号分隔。
若脚本出错或执行结果不符合预期,则脚本以非零值退出,这会导致 TaskManager 初始化失败,并在日志中打印脚本的错误信息。
Flink 提供的默认脚本是通过 "nvidia-smi" 工具来获取当前的机器中可用的 GPU 数量以及 index,并根据所需要的 GPU 数量返回对应数量的 GPU Index 列表。当无法获取到所需数量的 GPU 时,脚本将以非零值退出。
GPU 设备的资源分为两个维度,流处理器与显存,其显存资源只支持独占使用。因此,当多个 TaskManager 运行在同一台机器上时,若一块 GPU 被多个进程使用,可能导致其显存 OOM。因此,Standalone 模式下,需要 TaskManager 级别的资源隔离机制。
默认脚本提供了 Coordination Mode 来支持单机中多个 TaskManager 进程之间的 GPU 资源隔离。该模式通过使用文件锁来实现多进程间 GPU 使用信息同步,协调同一台机器上多个 TaskManager 进程对 GPU 资源的使用。
5. 在算子中获取扩展资源信息
在用户自定义算子中,可使用在 "external-resources" 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos 接口获取对应扩展资源的信息。以 GPU 为例,得到的每个 ExternalResourceInfo 代表一块 GPU 卡,而其中包含名为 "index" 的字段代表该 GPU 卡的设备 Index。
- public class ExternalResourceMapFunction extends RichMapFunction
{ private static finalRESOURCE_NAME="gpu"; @Override public String map(String value) { Set gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME); List indexes = gpuInfos.stream() .map(gpuInfo -> gpuInfo.getProperty( "index").get()).collect(Collectors.toList()); // Map function with GPU// ... }}
6. MNIST Demo
下图以 MNIST 数据集的识别任务来演示使用 GPU 加速 Flink 作业。
MNIST 如上图所示,为手写数字图片数据集,每个图片可表示为为 28*28 的矩阵。在该任务中,我们使用预训练好的 DNN 模型,图片输入经过一层全连接网络得到一个 10 维向量,该向量最大元素的下标即为识别结果。
我们在一台拥有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 进程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 功能,我们可以保证每个 TaskManager 各使用其中一块 GPU 卡。
该任务的核心算子为图像识别函数 MNISTClassifier,核心实现如下所示
- class MNISTClassifier extends RichMapFunction
, Integer> {
@Override public void open(Configuration parameters) { //获取GPU信息并且选择第一块GPU Set externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName); final Optional firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index"); // 使用第一块GPU的index初始化JCUDA组件 JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get())); JCublas.cublasInit(); }}
在 Open 方法中,从 RuntimeContext 获取当前 TaskManager 可用的 GPU,并选择第一块来初始化 JCuda 以及 JCublas 库。
- class MNISTClassifier extends RichMapFunction
, Integer> {
@Override public Integer map(List value) { // 使用Jucblas做矩阵算法 JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f, matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1); // 获得乘法结果并得出该图所表示的数字 JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1); JCublas.cublasFree(inputPointer); JCublas.cublasFree(outputPointer); int result = 0; for (int i = 0; i < DIMENSIONS.f1; ++i) { result = output[i] > output[result] ? i : result; } return result; }}
在 Map 方法中,将预先训练好的模型参数与输入矩阵放入 GPU 显存,使用 JCublas 进行 GPU 中的矩阵乘法运算,最后将结果向量从 GPU 显存中取出并得到识别结果数字。
具体案例演示流程可以前往观看视频或者参考 Github 上面的链接动手尝试。
四、未来计划
除了上文介绍的这些已经发布的特性外,Apache Flink 社区也正在积极准备更多资源管理方面的优化特性,在未来的版本中将陆续和大家见面。
被动资源调度模式:托管内存使得 Flink 任务可以灵活地适配不同的 TaskManager/Slot 资源,充分利用可用资源,为计算任务提供给定资源限制下的最佳算力。但用户仍需指定计算任务的并行度,Flink 需要申请到满足该并行度数量的 TaskManager/Slot 才能顺利执行。被动资源调度将使 Flink 能够根据可用资源动态改变并行度,在资源不足时能够 best effort 进行数据处理,同时在资源充足时恢复到指定的并行度保障处理性能。
细粒度资源管理:Flink 目前基于 Slot 的资源管理与调度机制,认为所有的 Slot 都具有相同的规格。对于一些复杂的规模化生产任务,往往需要将计算任务拆分成多个子图,每个子图单独使用一个 Slot 执行。当子图间的资源需求差异较大时,使用相同规格的 Slot 往往难以满足资源效率方面的需求,特别是对于 GPU 这类成本较高的扩展资源。细粒度资源管理允许用户为作业的子图指定资源需求,Flink 会根据资源需求使用不同规格的 TaskManager/Slot 执行计算任务,从而优化资源效率。
五、总结
通过文章的介绍,相信大家对 Flink 内存管理有了更加清晰的认知。
首先从本地内存、Job Graph 编译阶段、执行阶段来解答每个流程的内存管理以及内存分配细节,通过新的参数配置控制 TaskManager的内存分配;
然后从大家平时遇到资源调度相关问题,包括最大 Slot 数使用,如何进行 TaskManager 进行容错,任务如何通过任务平铺均摊任务资源;
最后在机器学习和深度学习领域常常用到 GPU 进行加速计算,通过解释 Flink 在 1.12 版本如何使用扩展资源框架和演示 Demo, 给我们展示了资源扩展的使用。再针对资源利用率方面提出 2 个社区未来正在做的计划,包括被动资源模式和细粒度的资源管理。