在使用yarn cluster模式提交flink的任务时候,往往会涉及到很多内存参数的配置
例如下面的提交命令:
flink run -d -m yarn-cluster -yjm 512 -ytm 5028 -yD jobmanager.memory.off-heap.size=64m -yD jobmanager.memory.jvm-metaspace.size=128m -yD jobmanager.memory.jvm-overhead.min=64m -yD taskmanager.memory.jvm-metaspace.size=128m -yD taskmanager.memory.jvm-overhead.max=192m -yD taskmanager.memory.network.max=128m -yD taskmanager.memory.managed.size=64m -c com.xxx.xxx ./xxx.jar
我们需要知道Jobmanager和Taskmanager对应的内存模型,来分配相应的内存,让内存利用率最大化。
JobManager内存模型:
参数设置:
- jobmanager.memory.process.size:对应到图中的 Total Process Memory 。对应到 -yjm。
-
jobmanager.memory.flink.size:对应到图中的Total Flink Memory,作业管理器的总Flink内存大小。这包括JobManager消耗的所有内存,除了JVM元空间和JVM开销,它由JVM堆内存和堆外内存组成。
- jobmanager.memory.heap.size :对应到图中的JVM Head:JobManager的JVM堆内存大小。
-
jobmanager.memory.off-heap.size:默认值:128mb,对应到图中的Off-Heap Memory。JobManager的堆外内存。
-
jobmanager.memory.jvm-metaspace.size:默认值:256mb ,对应到图中的JVM Metaspace。JobManager JVM 进程的 Metaspace。
-
JVM Overhead,是用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等,JVM开销的大小是为了弥补总进程内存中配置的部分。
-
jobmanager.memory.jvm-overhead.fraction:默认值 0.1(Total Process Memory的0.1)。
- jobmanager.memory.jvm-overhead.min:默认值192mb
-
jobmanager.memory.jvm-overhead.max:默认值1gb
- 按照比例算,如果内存大小小于或大于配置的最小或最大大小,则将使用最小或最大大小。可以通过将最小和最大大小设置为相同的值,可以显式指定JVM开销的确切大小。
-
那么如果设置了 -yjm 1024 ,JobManager的JVM的堆内存大小是多少呢?
首先,在没有显示的设置其它内存的情况下,所有的内存组成部分都用默认值
- JVM Metaspace:256mb
- JVM Overhead:1024 * 0.1 = 102.4 由于102.4 < 192 所以JVM Overhead的大小为192mb
- Total Flink Memory:1024 - 256 - 192 = 576
- Off-Heap Memory:128mb
- JVM Overhead:576 - 128 = 448mb
通过观察启动日志也可以发现:JVM Overhead的大小为448mb
有点时候,我们会希望运用到JVM Head的内存更多一点,那么就可以尝试着把其它的部分调低,来给到JVM Head更多的内存。
TaskManager的内存模型:
参数设置:
- taskmanager.memory.process.size:对应到图中的Total Process Memory,TaskExecutors的总进程内存大小。这包括TaskExecutor消耗的所有内存,包括总Flink内存、JVM元空间和JVM开销。对应到 -ytm。
-
taskmanager.memory.flink.size:对应到图中的Total Flink Memory,TaskExecutors的总Flink内存大小。它由框架堆内存、任务堆内存、任务堆外内存、托管内存和网络内存组成。
-
taskmanager.memory.framework.heap.size:默认值128mb,对应到图中的Framework Heap。用于 Flink 框架的 JVM 堆内存。
-
taskmanager.memory.task.heap.size:对应到图中的Task Heap。用于 Flink 应用的算子及用户代码的 JVM 堆内存,
- 托管内存:对应到图中的Managed Memory,流处理作业中使用 RocksDB State Backend,批处理作业中用于排序、哈希表及缓存中间结果
-
taskmanager.memory.managed.size
-
taskmanager.memory.managed.fraction :默认0.4
-
如果未明确指定托管内存大小,托管内存为Flink内存总量(Total Flink Memory)的0.4。
-
-
taskmanager.memory.framework.off-heap.size:默认值128mb,对应到图中的Framework Off-Head。用于Flink框架的堆外内存。
-
taskmanager.memory.task.off-heap.size:默认0,对应到图中的Task Off-Head 。用于 Flink 应用的算子及用户代码的堆外内存。
-
网络内存(Network Memory),对应到图中的 Network ,用于任务之间数据传输的直接内存(例如网络传输缓冲)
- taskmanager.memory.network.min:默认值64mb
-
taskmanager.memory.network.max:默认值1gb
-
taskmanager.memory.network.fraction:默认值0.1(Total Flink Memory)
- 网络内存是为ShuffleEnvironment保留的堆外内存(例如,网络缓冲区)。按照比例算,如果内存大小小于/大于配置的最小/最大大小,则将使用最小/最大大小。通过将min/max设置为相同的值,可以明确指定网络内存的确切大小。
-
taskmanager.memory.jvm-metaspace.size:默认值256mb,对应到图中的JVM Metaspace,Flink JVM 进程的 Metaspace。
-
JVM 开销:对应到JVM Overhead,用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。
-
taskmanager.memory.jvm-overhead.min:默认值192mb
-
taskmanager.memory.jvm-overhead.max:默认值1gb
-
taskmanager.memory.jvm-overhead.fraction:默认值0.1(Total Process Memory)
-
那么如果设置了 -ytm 1024 、Managed Memory 为100mb,TaskManager的JVM的堆内存大小是多少呢?
首先,在没有显示的设置其它内存的情况下,所有的内存组成部分都用默认值
- JVM Metaspace:256mb
- JVM Overhead:1024 * 0.1 = 102.4 由于102.4 < 192 所以JVM Overhead的大小为192mb
- Framework Heap:128mb
- Framework Off-Head:128mb
- Task Off-Head : 0
- Total Flink Memory = 1024 - 256 - 192 = 576mb
- Managed Memory = 100mb
- Network Memory = 576 * 0.1 < 64 所以Network Memory:64mb
- JVM Head:576 - 100 - 128 - 64 = 284mb
- Task Head:284 - 128 = 156mb
在启动日志中也能够看得到:
在设置内存的时候 如果托管内存(Managed Memory)用不到的话,可以设置为0,以至于能将更多的内存用于JVM Head内存上。
并行度和TaskManager的关系
- -ys 设置一个taskmanager的slot个数
- -p 设置任务的并行度
taskmanager的数量 = p / ys + 1
jobmanager的数量 = 1
消耗的container的数量 = TaskManager的数量+1,+1是因为还有JobManager这个进程。
消耗的vcore的数量 = TaskManager的数量 * solt + 1,+1理由同上
消耗的yarn内存数 = jobmanager的数量 * yjm + taskmanager的数量 * ytm