Efficient memory use is critical for good performance, but the reverse is also true: inefficient memory use leads to bad performance.
Spark 的整体架构图如下:
图片
Spark 应用程序包括两个 JVM 进程:driver进程和executor进程。其中:
- driver进程是主控制进程,负责创建 SparkSession/SparkContext、提交作业、将作业转换为任务以及协调执行器之间的任务执行。
- executor进程主要负责执行特定的计算任务并将结果返回给驱动程序。driver的进程的内存管理相对简单,Spark并没有对此制定具体内存管理计划。
因此在这篇文章中,我们将会详细深入分析executor的内存管理。
2.Excutor内存模型
executor充当在工作节点上启动的 JVM 进程。因此,了解 JVM 内存管理非常重要。我们知道JVM 内存管理分为两种类型:
- 堆内存管理(In-Heap Memory):对象在 JVM 堆上分配并由 GC 绑定。
- 堆外内存管理(外部内存):对象通过序列化在JVM外部的内存中分配,由应用程序管理,不受GC约束。
整体的JVM结构如下所示:
图片
通常,对象的读写速度为:on-heap > off-heap > disk
2.1 内存管理
Spark 内存管理分为两种类型:静态内存管理器(Static Memory Management,SMM),以及统一内存管理器(Unified Memory Management,UMM)。
图片
在Spark1.6.0之前只有一种内存管理方案,即Static Memory Management,但是从 Spark 1.6.0 开始,引入Unified Memory Manager 内存管理方案,并被设置为 Spark 的默认内存管理器,从代码中开始发现(以下代码是基于spark 2.4.8)。
// Determine whether to use the old memory management mode
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
// The old version uses static memory management
new StaticMemoryManager(conf, numUsableCores)
} else {
// The new version uses unified memory management
UnifiedMemoryManager(conf, numUsableCores)
}
而在最新的Spark 3.x开始, Static Memory Management由于缺乏灵活性而已弃用,在源码中已经看到关于Static Memory Management的所有代码,自然也就看不到控制内存管理方案选择的spark.memory.useLegacyMode这个参数。
2.2 静态内存管理器(SMM)
虽然在spark 3.x版本开始SMM已经被淘汰了,但是目前很多企业使用的spark的版本还有很多是3.x之前的,因此我觉得为了整个学习的连贯性,还是有必要说一下的静态内存管理器 (SMM) 是用于内存管理的传统模型和简单方案,该方案实现上简单粗暴,将整个内存区间分成了:存储内存(storage memory,)、执行内存(execution memory)和其他内存(other memory)的大小在应用程序处理过程中是固定的,但用户可以在应用程序启动之前进行配置。这三部分内存的作用及占比如下:storage memory:主要用于缓存数据块以提高性能,同时也用于连续不断地广播或发送大的任务结果。通过spark.storage.memoryFraction进行配置,默认为0.6。
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
其中又可以分成两部分:预留区域:这部分主要是为了防止OOM,大概占了存储区域中的10%,由参数spark.storage.safetyFraction控制;可用的存储区域:该区域主要是为了缓存RDD的数据和Broadcast数据,大概占了存储区域的90%。另外该区域中并不是所有的内存都用于以上作用,还单独拎出来一部分区域用于缓存iterator形式的block数据,我们称之为Unroll区域,由参数spark.storage.unrollFraction控制,大概占了可用的存储区域的20%,如下:
private val maxUnrollMemory: Long = {
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
execution memory:在执行shuffle、join、sort和aggregation时,用于缓存中间数据。通过spark.shuffle.memoryFraction进行配置,默认为0.2。
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
从代码中我们可以看到,可执行内存也分成了两个部分:预留部分和可用部分,类似存储内存学习,这里不在赘述。other memory:除了以上两部分的内存,剩下的就是用于其他用作的内存,默认为0.2。这部分内存用于存储运行Spark系统本身需要加载的代码与元数据。因此,关于SMM的整体分配图如下:
图片
基于此就会产生不可逾越的缺点:即使存储内存有可用空间,我们也无法使用它,并且由于执行程序内存已满,因此存在磁盘溢出。(反之亦然)。另外一个最大的问题就是:SMM只支持堆内内存(On-Heap),不支持对外内存(Off-Heap)
补充知识1:在Spark的存储体系中,数据的读写是以块(Block)为单位,也就是说Block是Spark存储的基本单位,这里的Block和Hdfs的Block是不一样的,HDFS中是对大文件进行分Block进行存储,Block大小是由dfs.blocksize决定的;而Spark中的Block是用户的操作单位,一个Block对应一块有组织的内存,一个完整的文件或文件的区间端,并没有固定每个Block大小的做法。每个块都有唯一的标识,Spark把这个标识抽象为BlockId。BlockId本质上是一个字符串,但是在Spark中将它保证为"一组"case类,这些类的不同本质是BlockID这个命名字符串的不同,从而可以通过BlockID这个字符串来区别BlockId
补充知识2:内存池是Spark内存的抽象,它记录了总内存大小,已使用内存大小,剩余内存大小,提供给MemoryManager进行分配/回收内存。它包括两个实现类:ExecutionMemoryPool和StorageMemoryPool,分别对应execution memory和storage memory。当需要新的内存时,spark通过memoryPool来判断内存是否充足。需要注意的是memoryPool以及子类方法只是用来标记内存使用情况,而不实际分配/回收内存。
2.3 统一内存管理器(UMM)
从 Spark 1.6.0 开始,采用了新的内存管理器来取代静态内存管理器,并为 Spark 提供动态内存分配。它将内存区域分配为由存储和执行共享的统一内存容器。当未使用执行内存时,存储内存可以获取所有可用内存,反之亦然。如果任何存储或执行内存需要更多空间,则会调用acquireMemory方法将扩展其中一个内存池并收缩另一个内存池。因此,UMM相比SMM的内存管理优势明显:存储内存和执行内存之间的边界不是静态的,在内存压力的情况下,边界会移动,即一个区域会通过从另一个区域借用空间来增长。当应用程序没有缓存并且正在进行时,执行会使用所有内存以避免不必要的磁盘溢出。当应用程序有缓存时,它将保留最小存储内存,以便数据块不受影响。此内存管理可为各种工作负载提供合理的开箱即用性能,而无需用户了解内存内部划分方式的专业知识。
2.3.1 堆内存
默认情况下,Spark 仅使用堆内存。Spark 应用程序启动时,堆内存的大小由 --executor-memory 或 spark.executor.memory 参数配置。在UMM下,spark的堆内存结构图如下:
图片
我们发现大体上和SMM没有太大的区别,包括每个区域的功能,只是UMM在Storage和Execution可以弹性的变化(这一点也是spark rdd中“弹性”的体现之一)。
备注:在 Spark 1.6 中,spark.memory.fraction 值为 0.75,spark.memory.storageFraction 值为 0.5。从spark 2.x开始spark.memory.fraction 值为 0.6。
2.3.1.1 System Reserved:系统预留
预留内存是为系统预留的内存,用于存储Spark的内部对象。从 Spark 1.6 开始,该值为 300MB。这意味着 300MB 的 RAM 不参与 Spark 内存区域大小计算。预留内存的大小是硬编码的,如果不重新编译 Spark 或设置 spark.testing.reservedMemory,则无法以任何方式更改其大小,一般在实际的生产环境中不建议修改此值。
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
从源码中我们可以看出,如果执行程序内存小于保留内存的 1.5 倍(1.5 * 保留内存 = 450MB),则 Spark 作业将失败,并显示以下异常消息:
24/03/20 13:55:51 ERROR repl.Main: Failed to initialize Spark session.
java.lang.IllegalArgumentException: Executor memory 314572800 must be at least 471859200. Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration.
at org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:225)
at org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:199)
2.3.1.2 其他内存(或称用户内存)
其他内存是用于存储用户定义的数据结构、Spark 内部元数据、用户创建的任何 UDF 以及 RDD 转换操作所需的数据(如 RDD 依赖信息等)的内存。例如,我们可以通过使用 mapPartitions 转换来重写 Spark 聚合,以维护一个哈希表以运行此聚合,这将消耗所谓的其他内存。此内存段不受 Spark 管理,计算公式为:(Java Heap - Reserved Memory) * (1.0 - spark.memory.fraction)。
2.3.1.3 Spark内存(或称统一内存)
Spark Memory 是由 Apache Spark 管理的内存池。Spark Memory 负责在执行任务(如联接)或存储广播变量时存储中间状态。计算公式为:(Java Heap - Reserved Memory) * spark.memory.fraction。
Spark 任务在两个主要内存区域中运行:
- Executor Memory:用于随机播放、联接、排序和聚合。
- Storage Memory:用于缓存数据分区。
它们之间的边界由 spark.memory.storageFraction 参数设置,默认为 0.5 或 50%。
1)StorageMemory: 存储内存
存储内存用于存储所有缓存数据、广播变量、unroll数据等,“unroll”本质上是反序列化序列化数据的过程。任何包含内存的持久性选项都会将该数据存储在此段中。Spark 通过删除基于最近最少使用 (LRU) 机制的旧缓存对象来为新缓存请求清除空间。缓存的数据从存储中取出后,将写入磁盘或根据配置重新计算。广播变量存储在缓存中,具有MEMORY_AND_DISK持久性级别。这就是我们存储缓存数据的地方,这些数据是长期存在的。
计算公式:
(Java Heap - Reserved Memory) * spark.memory.fraction * spark.memory.storageFraction
2)Execution Memory:执行内存
执行内存用于存储 Spark 任务执行过程中所需的对象。例如,它用于将映射端的shuffle中间缓冲区存储在内存中。此外,它还用于存储hash聚合步骤的hash table。如果没有足够的可用内存,执行内存池还支持溢出磁盘,但是其他线程(任务)无法强制逐出此池中的block。执行内存往往比存储内存寿命更短。每次操作后都会立即将其逐出,为下一次操作腾出空间。
计算公式:
(Java Heap - Reserved Memory) * spark.memory.fraction * (1.0 - spark.memory.storageFraction)
由于执行内存的性质,无法从此池中强制逐出块;否则,执行将中断,因为找不到它引用的块。但是,当涉及到存储内存时,可以根据需要从内存中逐出block并写入磁盘或重新计算(如果持久性级别为MEMORY_ONLY)。
存储和执行池借用规则:
- 只有当执行内存中有未使用的块时,存储内存才能从执行内存中借用空间。
- 如果块未在存储内存中使用,则执行内存也可以从存储内存中借用空间。
- 如果存储内存使用执行内存中的块,并且执行需要更多内存,则可以强制逐出存储内存占用的多余块
- 如果存储内存中的块被执行内存使用,而存储需要更多的内存,则无法强行逐出执行内存占用的多余块;它将具有更少的内存区域。它将等到 Spark 释放存储在执行内存中的多余块,然后占用它们。
案例:计算 5 GB 执行程序内存的内存
为了计算预留内存、用户内存、spark内存、存储内存和执行内存,我们将使用以下参数:
spark.executor.memory=5g
spark.memory.fractinotallow=0.6
spark.memory.storageFractinotallow=0.5
那么会得到如下结论:
Java Heap Memory = 5 GB
= 5 * 1024 MB
= 5120 MB
Reserved Memory = 300 MB
Usable Memory = (Java Heap Memory - Reserved Memory)
= 5120 MB - 300 MB
= 4820 MB
User Memory = Usable Memory * (1.0 * spark.memory.fraction)
= 4820 MB * (1.0 - 0.6)
= 4820 MB * 0.4
= 1928 MB
Spark Memory = Usable Memory * spark.memory.fraction
= 4820 MB * 0.6
= 2892 MB
Spark Storage Memory = Spark Memory * Spark.memory.storageFraction
= 2892 MB * 0.5
= 1446 MB
Spark Execution Memory = Spark Memory * (1.0 - spark.memory.storageFraction)
= 2892 MB * ( 1 - 0.5)
= 2892 MB * 0.5
= 1446 MB
2.3.2 堆外内存
堆外内存是指将内存对象(序列化为字节数组)分配给 JVM堆之外的内存,该堆由操作系统(而不是JVM)直接管理,但存储在进程堆之外的本机内存中(因此,它们不会被垃圾回收器处理)。这样做的结果是保留较小的堆,以减少垃圾回收对应用程序的影响。访问此数据比访问堆存储稍慢,但仍比从磁盘读取/写入快。缺点是用户必须手动处理管理分配的内存。此模型不适用于 JVM 内存,而是将 malloc() 中不安全相关语言(如 C)的 Java API 直接调用操作系统以获取内存。由于此方法不是对 JVM 内存进行管理,因此请避免频繁 GC。此应用程序的缺点是内存必须写入自己的逻辑和内存应用程序版本。Spark 1.6+ 开始引入堆外内存,可以选择使用堆外内存来分配 Unified Memory Manager。
默认情况下,堆外内存是禁用的,但我们可以通过 spark.memory.offHeap.enabled(默认为 false)参数启用它,并通过 spark.memory.offHeap.size(默认为 0)参数设置内存大小。如:
spark-shell \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=5g
堆外内存支持OFF_HEAP持久性级别。与堆上内存相比,堆外内存的模型相对简单,仅包括存储内存和执行内存。
如果启用了堆外内存,Executor 中的 Execution Memory 是堆内的 Execution 内存和堆外的 Execution 内存之和。存储内存也是如此。
总之,Spark内存管理的核心目标是在有限的内存资源下,实现数据缓存的最大化利用和执行计算的高效进行,同时尽量减少由于内存不足导致的数据重算或内存溢出等问题,是整个spark允许可以稳定运行的基础保障。