引言
本文是关于Spark优化性能与内存使用的最佳实践,翻译整理自Tuning - Spark 3.3.2 Documentation。由于spark内存计算的特性,很多因素都会影响Spark的表现:CPU、网络带宽或者内存。一般来说,数据可以全部装入内存,则带宽是瓶颈;有时你需要进行调优,主要是两个方面:数据序列化和内存使用。
数据序列化
在分布式应用中数据序列化扮演着至关重要的角色。序列化对象的速度很慢,或者消耗大量字节的格式,会大大降低计算速度。通常情况下,这将是你优化Spark应用时首先要调整的东西。Spark的目标是在易用性(允许你在操作中使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:
Java serialization:默认是这个,Java序列化很灵活,但往往相当慢,而且导致许多类的序列化格式很大。
Kryo serialization:Spark也可以使用Kyro库更快地序列化对象。Kryo明显比Java序列化更快、更紧凑(通常高达10倍),但不支持所有的Serializable类型,并要求你提前注册你将在程序中使用的类以获得最佳性能。
使用Kryo注册并不是想象中十分晦涩难懂的操作,多数情况仅需一行代码就行!
可以在 SparkConf 里设置conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
来初始化Kryo
。建议在网络密集型应用里使用Kyro序列化。从Spark2.0开始,在Shuffle RDD阶段的一些简单类型已经自动使用了Kyro序列化。
想要注册自定义类使用Kyro,只需如下操作:
val conf = new SparkConf().setMaster(...)......
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
如果代码对象很大,你需要增大spark.kryoserializer.buffer
配置。如果你没有注册你的自定义类,Kryo仍然会生效,但是它不得不随对象存储全类名,这很浪费资源。
内存调优
这部分将首先概述Spark的内存管理,然后讨论用户可以采取的具体策略,以便在我们的应用程序中更有效地利用内存。特别是,我们将描述如何确定你的对象的内存使用情况,以及如何通过改变你的数据结构改善它,或通过以序列化的格式存储数据。然后,我们将介绍调整Spark的缓存大小和Java的垃圾收集器。
内存管理概述
众所周知的是Spark的内存主要分为2大块:执行与存储(execution and storage)。执行内存就是计算用的,如shuffle/join/sort这些,存储内存则用于缓存和跨集群传递数据。在Spark中这两块内存是统一区域管理的,名为M。当没有执行内存需求时,存储内存可以获取全部内存,反之亦然。执行可以在必要时驱逐存储占用的内存空间,直到存储内存占用降低至某一界限R。换言之,R描述了M中的一个子区域,其中缓存的块永远不会被驱逐。由于执行中的复杂性,存储可能不会驱逐执行内存。
这种设计确保了几个理想的特性。首先,不使用缓存的应用程序可以使用整个空间来执行,避免了不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小的存储空间(R),其数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用的性能,而不需要用户对内存的内部划分有专业认识。
虽然有两个相关的配置,但一般用户应该不需要调整,因为默认值适用于大多数工作负载。
spark.memory.fraction
将M的大小表示为(JVM堆空间-300MB)的一部分(默认为0.6)。其余的空间(40%)被保留给用户数据结构、Spark的内部元数据,以及在记录稀少和异常大的情况下对OOM错误的保护。
spark.memory.storageFraction
表示R占M多大一部分(默认为0.5)。R是M中的存储空间,其中的缓存块对执行的驱逐免疫。
确定内存消耗
并没有一个放之四海而皆准的公式告诉你RDD占用了多少内存,对一个具体业务需要实践出真知。
确定一个数据集所需的内存消耗量的最佳方法是创建一个RDD,将其放入缓存,并查看Web UI中的 "Storage "页面。该页面将告诉你该RDD占用了多少内存。
要估计一个特定对象的内存消耗,可以使用SizeEstimator’s estimate
方法。这对于试验不同的数据布局以修整内存使用量,以及确定一个广播变量在每个执行器堆上所占用的空间是很有用的。
调整数据结构
减少内存消耗的第一个方法是避免那些增加开销的Java特性,如基于指针的数据结构和包装对象。有几种方法可以做到这一点。
- 将你的数据结构设计成倾向于对象的数组和原始类型,而不是标准的Java或Scala集合类(例如
HashMap
)fastutil库为原始类型提供了方便的集合类,与Java标准库兼容。 - 尽可能避免使用带有大量小对象和指针的嵌套结构。
- 考虑使用数字ID或枚举对象而不是字符串作为键。
- 如果你的RAM少于32GiB,设置JVM标志-XX:+UseCompressedOops,使指针为四字节而不是八字节。你可以在
spark-env.sh
中添加这些选项。
RDD序列化存储
当你的对象仍然太大,无法有效地存储,尽管有这样的调整,减少内存使用的一个更简单的方法是以序列化的形式存储它们,使用RDD持久化API中的序列化存储级别,如MEMORY_ONLY_SER
。然后,Spark将把每个RDD分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须在运行中对每个对象进行反序列化。如果你想以序列化的形式缓存数据,我们强烈建议你使用Kryo
,因为它导致的大小比Java序列化小得多(当然也比原始Java对象小)。
GC的调整
当你的程序所存储的RDD有很大的 "流失 "时,JVM的垃圾回收可能是一个问题。(在只读取一次RDD,然后对其进行许多操作的程序中,这通常不是一个问题)。当Java需要驱逐旧对象为新对象腾出空间时,它需要追踪你所有的Java对象并找到未使用的对象。这里需要记住的要点是,垃圾收集的成本与Java对象的数量成正比,所以使用对象较少的数据结构(例如,用Ints
数组代替LinkedList
)可以大大降低这一成本。一个更好的方法是以序列化的形式持久化对象,如上所述:现在每个RDD分区将只有一个对象(一个字节数组)。在尝试其他技术之前,如果GC是一个问题,首先要尝试的是使用序列化的缓存。
由于你的任务的工作内存(运行任务所需的空间量)和你的节点上缓存的RDD之间的干扰,GC也可能成为一个问题。我们将讨论如何控制分配给RDD缓存的空间以缓解这一问题。
测量GC的影响
GC调整的第一步是收集关于垃圾收集发生频率和花费在GC上的时间的统计数据。这可以通过在Java选项中添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps来实现。下次运行Spark作业时,你会看到每次发生GC时,工作节点的日志中都会打印出信息。请注意,这些日志将出现在集群的工作节点上(在其工作目录的stdout文件中),而不是在你的驱动程序上。
高级GC调优
为了进一步调整GC,我们首先需要了解一些关于JVM中内存管理的基本信息。
Java的堆空间被分为两个区域 Young 和 Old。Young代是用来存放临时的对象的,而Old代是用来存放寿命较长的对象的。
年轻一代又被划分为三个区域[Eden, Survivor1, Survivor2]。
对GC行为的简化描述:当Eden满时,在Eden上运行一个小的GC,Eden和Survivor1中活着的对象被复制到Survivor2。Survivor区域被交换。如果一个对象足够老或者Survivor2已经满了,它就会被移到Old。最后,当Old接近满的时候,一个Full GC
被调用。
Spark中GC调整的目标是确保只有长期存在的RDD被存储在Old一代,而Young一代有足够的大小来存储短期对象。这将有助于避免全面GC来清理任务执行过程中创建的临时对象。一些可能有用的步骤是。
- 通过收集GC统计信息,检查是否有太多的垃圾收集。如果在一个任务完成之前多次调用
Full GC
,这意味着没有足够的内存可用于执行任务。
- 如果有太多的小GC但没有太多的大GC,为Eden分配更多的内存会有帮助。你可以将Eden的大小设置为对每个任务所需内存的高估值。如果Eden的大小被确定为E,那么你可以使用选项
-Xmn
=4/3*E来设置Young generation的大小。(按4/3的比例增加是为了考虑幸存者区域所使用的空间)。 - 在打印的GC统计中,如果OldGen接近满了,通过降低
spark.memory.fraction
来减少用于缓存的内存量;缓存更少的对象比减缓任务的执行要好。另外,也可以考虑减少Young代的大小。这意味着降低-Xmn
,如果你已经如上设置。如果没有,可以尝试改变JVM的NewRatio参数的值。许多JVM将其默认为2,这意味着老一代占据了2/3的堆。它应该足够大,以至于这个分数超过了spark.memory.fraction
。 - 尝试设置
-XX:+UseG1GC
来使用G1GC垃圾收集器。在垃圾收集是一个瓶颈的情况下,它可以提高性能。注意,对于大的执行器堆大小,用-XX:G1HeapRegionSize
增加G1区域大小可能很重要。 - 举个例子,如果你的任务是从HDFS读取数据,任务使用的内存量可以用从HDFS读取的数据块的大小来估计。请注意,解压后的块的大小往往是块的2或3倍。因此,如果我们希望有3或4个任务的工作空间,而HDFS块的大小是128MiB,我们可以估计Eden的大小是43128MiB。
- 监控垃圾收集的频率和时间在新的设置下如何变化。
我们的经验表明,GC调整的效果取决于你的应用程序和可用的内存量。网上还描述了许多调优选项,但在高层次上,管理完全GC发生的频率可以帮助减少开销。
可以通过在作业的配置中设置 spark.executor.defaultJavaOptions 或 spark.executor.extraJavaOptions 来指定执行器的 GC 调整设置。
其他考虑因素
并行度水平
除非你把每个操作的并行度设置得足够高,否则集群不会得到充分的利用。Spark会根据文件的大小自动设置在每个文件上运行的 map
任务的数量(当然你可以通过SparkContext.textFile等的可选参数来控制),而对于分布式的 "reduce "操作,比如groupByKey
和reduceByKey
,它会使用最大的父RDD的分区数量。你可以把并行程度作为第二个参数传递(见spark.PairRDDFunctions文档),或者设置配置属性spark.default.parallelism
来改变默认值。一般来说,我们建议在你的集群中每个CPU核有2-3个任务。
输入路径上的并行Listing
有时,当作业输入有大量的目录时,你可能还需要增加目录列表的并行性,否则这个过程可能会花费很长的时间,特别是在针对S3这样的对象存储时。如果你的作业在具有Hadoop输入格式的RDD上工作(例如,通过SparkContext.sequenceFile),则通过spark.hadoop.mapreduce.input.fileinputformat.list-status.num-reads(目前默认为1)控制并行性。
对于具有基于文件的数据源的Spark SQL,你可以调整spark.sql.sources.parallelPartitionDiscovery.threshold和spark.sql.sources.parallelPartitionDiscovery.parallelism,以提高列举并行性。更多细节请参考Spark SQL性能调优指南。
Reduce任务的内存使用情况
有时,你会得到OutOfMemoryError
,不是因为你的RDDs不适合在内存中,而是因为你的某个任务的工作集,比如groupByKey
中的一个Reduce
任务太大。Spark的shuffle操作(sortByKey、groupByKey、reduceByKey、join等)在每个任务中建立一个哈希表来执行分组,而这个哈希表往往会很大。这里最简单的解决方法是提高并行化水平,使每个任务的输入集更小。Spark可以有效地支持短至200毫秒的任务,因为它在许多任务中重复使用一个执行器JVM,而且它的任务启动成本很低,所以你可以安全地将并行化水平提高到超过集群中的核心数量。
广播大型变量
使用SparkContext
中的广播功能可以大大减少每个序列化任务的大小,以及在集群中启动作业的成本。如果你的任务中使用了驱动程序中的任何大型对象(例如静态查询表),可以考虑将其变成一个广播变量。Spark在主程序上打印每个任务的序列化大小,所以你可以看一下,以决定你的任务是否太大;一般来说,大于20KiB的任务可能值得优化。
数据位置
数据位置可以对Spark作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但如果代码和数据是分开的,一个必须移动到另一个。通常情况下,将序列化的代码从一个地方运送到另一个地方要比运送一大块数据快,因为代码的大小比数据小得多。Spark围绕这个数据定位的一般原则建立了它的调度。
数据定位是指数据离处理它的代码有多近。根据数据的当前位置,有几个级别的定位。按照从最近到最远的顺序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。
Spark通常的做法是等待一下,希望有一个繁忙的CPU腾出手来。一旦超时,它就开始把数据从远处移到空闲的CPU上。每个级别之间的回退等待超时可以单独配置,也可以在一个参数中全部配置;详见spark.locality
参数。如果你的任务很长,看到的定位性很差,你应该增加这些设置,但默认值通常很好用。
小结
这份简短指南指出了你在调整Spark应用程序时应该知道的主要问题——最重要的是数据序列化和内存调整。对于大多数程序来说,切换到Kryo
序列化并以序列化的形式持久化数据将解决大多数常见的性能问题。