优化Spark作业以获得最佳性能
从表面上看,Spark作业的开发似乎很容易,而且在大多数情况下确实如此。所提供的API设计精良且功能丰富,如果您熟悉Scala集合或Java流,那么您将很快完成实现。在群集上并在满负载下运行它们时,真正困难的部分是,因为并非所有作业在性能上都是相同的。不幸的是,要以最佳方式实施您的工作,您必须对Spark及其内部有相当的了解。
在本文中,我将讨论开发Spark应用程序时可能遇到的最常见的性能问题,以及如何避免或减轻它们。
1.转换 Transformations
使用RDD API时,最常见的性能问题是使用的转换对于特定用例而言是不够的。这可能是由于许多用户熟悉SQL查询语言及其对查询优化的依赖。重要的是要认识到RDD API不会应用任何此类优化。
让我们看一下相同计算的以下两个定义:
1 |
|
RDD | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
定义1 | 2646.3ms | 1570ms | 8444ms |
定义2 | 270.7ms | 96ms | 1569ms |
Lineage (definition1):
1 |
|
Lineage (definition2):
1 |
|
第二个定义比第一个定义快得多,因为它在我们的用例环境中通过不不必要地收集所有元素来更有效地处理数据。
在进行笛卡尔联接并随后对结果数据进行过滤而不是转换为RDD对并使用内部联接时,我们可以观察到类似的性能问题:
1 |
|
RDD | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
定义1 | 9255.3ms | 3750ms | 12077ms |
定义2 | 1525ms | 623ms | 2759ms |
Lineage (definition1):
1 |
|
Lineage (definition2):
1 |
|
经验法则是始终在转换边界处使用最少的数据量。RDD API会尽其所能来优化后台工作,例如任务调度,基于数据局部性的首选位置等。但它不会优化计算本身。实际上,这样做实际上是不可能的,因为每个转换都是由一个不透明的函数定义的,Spark无法查看我们正在使用的数据以及如何处理。
由此可以得出另一条经验法则:使用丰富的转换,即始终在单个转换的上下文中尽可能多地执行操作。用于此目的的有用工具是combineByKeyWithClassTag
:
1 |
|
Lineage :
1 |
|
DateFrames and DataSets
Spark社区实际上认识到了这些问题,并开发了两组高级API来解决此问题:DataFrame和Dataset。这些API附带了有关数据的其他信息,并定义了在整个框架中都可以识别的特定转换。调用动作时,将对计算图进行最优化,并将其转换为相应的RDD图,然后执行该图。
为了演示,我们可以尝试用两种非常不同的方式定义两个等效的计算,并比较它们的运行时间和作业图:
1 |
|
数据框 | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
定义1 | 1598.3ms | 929ms | 2765ms |
定义2 | 1770.9ms | 744ms | 2954ms |
解析的逻辑计划(定义1):
1 |
|
解析的逻辑计划(定义2):
1 |
|
物理计划(定义1):
1 |
|
物理计划(定义2):
1 |
|
优化后,转换的原始类型和顺序无关紧要,这要归功于称为基于规则的查询优化的功能。由于基于成本的查询优化,还考虑了数据大小以正确的方式对作业进行重新排序。最后,DataFrame API还将有关作业实际需要的列的信息推送到数据源读取器,以限制输入读取(这称为谓词下推)。实际上,以与DataFrame API所提供的功能相当的方式编写RDD作业非常困难。
但是,在一个方面,DataFrames表现不佳,并促使创建了另一种表示Spark计算的方法:类型安全。由于仅出于转换定义的目的用名称表示数据列,并且仅在运行时检查它们在实际数据类型方面的有效用法,因此这往往导致冗长的开发过程,我们需要跟踪所有列正确的类型,否则我们在执行过程中会出错。创建数据集API就是为此的解决方案。
Dataset API使用Scala的类型推断和基于隐式的技术来传递Encoders,这是特殊的类,用于描述Spark优化器的数据类型,就像DataFrames一样,同时保留编译时类型以进行类型检查和编写转换自然。如果这听起来很复杂,请参考以下示例:
1 |
|
数据集 | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
定义 | 544.9ms | 472ms | 728ms |
解析的逻辑计划:
1 |
|
物理计划:
1 |
|
后来意识到,可以将DataFrames视为这些Dataset的特例,并且将API统一(使用称为Row的特殊优化类作为DataFrame的数据类型)。
但是,在数据集方面要牢记一个警告。作为开发人员熟悉了采集样RDD API,数据集API提供的最常用的方法自己的变种- filter
,map
和reduce
。这些工作(可以预期)具有任意功能。因此,Spark无法理解此类功能的详细信息,并且由于无法再正确传播某些信息(例如,用于谓词下推),其优化能力也有所削弱。这将在序列化部分中进一步说明。
1 |
|
解析的逻辑计划(DataFrame):
1 |
|
解析的逻辑计划(DataSet):
1 |
|
物理计划(DataFrame):
1 |
|
物理计划(DataSet):
1 |
|
并行转换 Parallel transformations
Spark可以并行运行多个计算。这可以通过在驱动程序上启动多个线程并在每个线程中发出一组转换来轻松实现。然后,结果任务将同时运行并共享应用程序的资源。这样可以确保资源永远不会保持空闲状态(例如,在等待特定转换的最后任务完成时)。默认情况下,任务以FIFO方式(在作业级别)进行处理,但是可以通过使用备用应用程序内调度程序来确保公平(通过将设置spark.scheduler.mode
为FAIR
)来更改此任务。然后,希望线程通过将spark.scheduler.pool
local属性(使用SparkContext.setLocalProperty
)设置为适当的池名称来设置其调度池。然后,应在spark.scheduler.allocation.file
设置定义的XML文件(默认情况下,此fairscheduler.xml
文件位于Spark的conf文件夹中)。
1 |
|
计算方式 | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
串行 | 173.1ms | 140ms | 336ms |
平行 | 141ms | 122ms | 200ms |
2.分区 Partitioning
大多数Spark作业遭受的第二个问题是数据分区不足。为了使我们的计算高效,重要的是将我们的数据划分为足够多的分区,这些分区的大小应尽可能彼此接近(统一),以便Spark可以调度正在执行的各个任务他们以不可知论的方式仍然可以预测。如果分区不统一,则说该分区是倾斜的。发生这种情况可能有多种原因,并且可能发生在我们计算的不同部分。
从数据源读取时,我们的输入可能已经倾斜。在RDD API中,这通常是使用textFile
和wholeTextFiles
方法完成的,它们具有令人惊讶的不同分区行为。该textFile
方法旨在从(通常是较大的)文件中读取文本的每一行,默认情况下将每个输入文件块作为单独的分区加载。它还提供了一个minPartitions
参数,当该参数大于块数时,将尝试进一步拆分这些分区以满足指定值。另一方面,wholeTextFiles
该方法用于读取(通常是较小的)文件的全部内容,将相关文件的块根据其在群集中的实际位置合并到池中,并且默认情况下为每个这些池创建一个分区(有关更多信息,请参见Hadoop的CombineFileInputFormat(在其实现中使用)。minPartitions
在这种情况下,该参数控制这些池的最大大小(等于totalSize/minPartitions
)。所有minPartitions
参数的默认值为2。这意味着,wholeTextFiles
如果不使用默认设置而不在群集上显式管理数据局部性,则使用非常少的分区就容易得多。用于数据读入RDDS其它方法包括其它格式,例如sequenceFile
,binaryFiles
和binaryRecords
,以及通用的方法hadoopRDD
以及newAPIHadoopRDD
采用自定义格式的实现(允许自定义分区)。
分区特性经常在随机边界上变化。因此,暗示随机播放的操作将提供一个numPartitions
指定新分区计数的参数(默认情况下,分区计数与原始RDD中的计数相同)。还可以通过改组引入偏斜,尤其是在加入数据集时。
1 |
|
由于这些情况下的分区完全取决于所选键(特别是其Murmur3哈希值),因此必须注意避免为通用键创建异常大的分区(例如,空键是常见的特殊情况)。一个有效的解决方案是分离相关记录,在它们的键中添加一个salt(随机值),并在多个阶段对其执行后续操作(例如reduce),以获得正确的结果。
1 |
|
有时甚至有更好的解决方案,例如如果其中一个数据集足够小,则使用Map侧联接。
1 |
|
DataFrames and Datasets
高级API共享一种特殊的数据分区方法。将输入文件的所有数据块都添加到公用池中,就像在中一样wholeTextFiles
,但是然后根据两个设置将池划分为分区:spark.sql.files.maxPartitionBytes
,它指定最大分区大小(默认为128MB),而spark.sql.files.openCostInBytes
,则指定估算成本打开一个可能已读取的新文件(以字节为单位)(默认为4MB)。该框架将根据此信息自动找出输入数据的最佳分区。
当涉及到对shuffle进行分区时,令人遗憾的是,缺少高级API(至少从Spark 2.2起)。通过指定spark.sql.shuffle.partitions
设置(默认为200),只能在作业级别上静态指定分区数。
高级API可以自动将联接操作转换为广播联接。这由控制spark.sql.autoBroadcastJoinThreshold
,它指定要广播的表的最大大小(默认为10MB)spark.sql.broadcastTimeout
,并且控制执行者等待广播表的时间(默认为5分钟)。
重新分区
所有的API还提供了两种方法来控制分区的数量。第一个是repartition
强制改组以便在指定数量的分区之间重新分配数据(通过前述的Murmur哈希)。由于混排数据是一项昂贵的操作,因此,如果可能,应避免重新分区。此操作还有更多特定的变体:可排序对RDD具有repartitionAndSortWithinPartitions
可与自定义分区程序一起使用的功能,而DataFrame和Dataset具有repartition
带有列参数的功能来控制分区特性。
所有API提供的第二种方法的coalesce
性能要比repartition
它好得多,因为它不会shuffle数据,而只是指示Spark读取多个现有分区。但是,这只能用于减少分区数,而不能用于更改分区特性。通常没有理由使用它,因为Spark旨在利用大量的小分区,而不是减少输出中的文件数或与之一起使用时的批处理数foreachPartition
(例如,将结果发送到数据库) 。
3.序列化
正确处理的另一件棘手的事情是序列化,它有两种形式:数据序列化和闭包序列化。数据序列化是指对存储在RDD中的实际数据进行编码的过程,而闭包序列化是指与在外部引入计算(例如共享字段或变量)的数据相同的过程。区分这两个很重要,因为它们在Spark中的工作方式非常不同。
数据序列化
Spark支持两种不同的序列化器以进行数据序列化。缺省的是Java序列化,尽管它非常易于使用(仅通过实现Serializable
接口),但效率很低。因此,建议将大多数生产用途切换到第二个受支持的Serializer Kryo。设置spark.serializer
为即可完成此操作org.apache.spark.serializer.KryoSerializer
。Kryo效率更高,并且不需要实现类Serializable
(因为它们由Kryo的FieldSerializer序列化默认)。但是,在极少数情况下,Kryo可能无法序列化某些类,这是为什么它仍不是Spark的默认值的唯一原因。注册所有预期要序列化的类也是一个好主意(Kryo将能够使用索引而不是完整的类名来标识数据类型,从而减少序列化数据的大小,从而进一步提高性能)。
1 |
|
RDD | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
java | 65990.9ms | 64482ms | 68148ms |
kryo | 30196.5ms | 28322ms | 33012ms |
Lineage (Java):
1 |
|
Lineage :
1 |
|
DataFrames and Datasets
当涉及到数据序列化时,高级API效率更高,因为它们知道正在使用的实际数据类型。因此,它们可以生成专门针对这些类型以及Spark在整个计算环境中使用它们的方式量身定制的优化序列化代码。对于某些转换,它也可能仅生成部分序列化代码(例如,计数或数组查找)。此代码生成步骤是Tungsten项目的组成部分,这是使高级API如此有效的重要原因。
值得注意的是,在此过程中,Spark可以从了解所应用转换的属性中受益,因为它可以在整个作业图中传播有关正在使用哪些列的信息(谓词下推)。在转换中使用不透明函数(例如,数据集map
或filter
)时,此信息会丢失。
1 |
|
数据框 | 平均时间 | Min时间 | Max时间 |
---|---|---|---|
tungsten | 1102.9ms | 912ms | 1776ms |
Lineage :
1 |
|
闭包序列化
在大多数Spark应用程序中,不仅需要序列化数据本身。在各个转换中还使用了外部字段和变量。让我们考虑以下代码片段:
1 |
|
在这里,我们使用从应用程序配置中加载的值作为计算本身的一部分。但是,由于转换函数之外发生的所有事情都发生在驱动程序上,因此Spark必须将值传输给相关的执行程序。因此,Spark在其中计算函数的闭包map
由它使用的所有外部值组成,将这些值序列化并通过网络发送。由于闭包可能非常复杂,因此决定只支持Java序列化。因此,闭包的序列化不如数据本身的序列化有效,但是,由于闭包仅针对每个转换的每个执行者而不是每个记录进行序列化,因此通常不会引起性能问题。(但是,实现这些值会有不愉快的副作用Serializable
。)
闭包中的变量很容易跟踪。使用字段可能会造成很多混乱。让我们看下面的例子:
1 |
|
在这里,我们可以看到它a
只是一个变量(与factor
以前一样),因此被序列化为Int
。b
是一个方法参数(也表现为变量),因此也被序列化为Int
。但是c
是一个类字段,因此不能单独序列化。这意味着为了序列化它,Spark需要SomeClass
使用它序列化它的整个实例(因此它必须扩展Serializable
,否则我们将得到一个运行时异常)。对于d
内部将构造函数参数转换为字段的情况也是如此。因此,在这两种情况下,Spark也必须发送的值c
,d
并e
给Executors。如e
序列化可能会非常昂贵,这绝对不是一个好的解决方案。我们可以通过避免闭包中的类字段来解决此问题:
1 |
|
在这里,我们通过将值存储在局部变量中来准备值sum
。然后将其序列化为一个简单的对象,Int
而不会拖动它的整个实例SomeClass
(因此它不再需要扩展Serializable
)。
Spark还定义了一种特殊的构造,以在需要为多个转换序列化相同值的情况下提高性能。它被称为广播变量,在计算之前被序列化并仅发送一次给所有执行者。这对于诸如查找表之类的大变量特别有用。
1 |
|
Spark提供了一个有用的工具来确定内存中对象的实际大小,称为SizeEstimator,它可以帮助我们确定特定对象是否适合广播变量。
4.内存管理 Memory management
对于应用程序而言,重要的是要有效地使用其内存空间。由于每个应用程序的内存要求不同,Spark将应用程序的驱动程序和执行程序的内存划分为多个部分,这些部分由适当的规则控制,并通过应用程序设置将大小大小留给用户。
驱动程序内存
驱动程序的内存结构非常简单。它仅使用其所有已配置的内存(由spark.driver.memory
设置控制,默认为1GB)作为共享堆空间。在群集部署设置中,还增加了开销,以防止YARN因使用过多资源而过早地终止驱动程序容器。
执行器内存
执行程序需要将其内存用于一些主要目的:用于当前转换的中间数据(执行内存),用于缓存的持久性数据(存储内存)以及用于转换的自定义数据结构(用户内存)。由于Spark可以计算每个存储记录的实际大小,因此它可以监视执行和存储部分并做出相应的反应。执行内存的大小通常非常不稳定,需要立即使用,而存储内存的寿命更长,稳定,通常可以逐出磁盘,而应用程序通常只需要整个计算的某些部分(有时根本不需要) )。因此,Spark为两者定义了一个共享空间,从而优先考虑执行内存。所有这些都由几个设置控制:spark.executor.memory
(默认为1GB)定义了可用堆空间的总大小,spark.memory.fraction
设置(默认为0.6)定义了执行和存储共享的内存的一部分(减去300MB缓冲区),并且spark.memory.storageFraction
(默认值为0.5)定义了无法执行的存储部分。以最适合您的应用程序的方式定义它们很有用。例如,如果应用程序大量使用缓存的数据并且不使用过多的聚合,则可以增加存储内存的比例以容纳将所有缓存的数据存储在RAM中,从而加快数据的读取速度。另一方面,如果应用程序使用昂贵的聚合并且不严重依赖缓存,则增加执行内存可以通过逐出不需要的缓存数据来改善计算本身,从而有所帮助。此外,请记住,您的自定义对象必须适合用户内存。
Spark还可以将堆外内存用于存储和部分执行,这由设置spark.memory.offHeap.enabled
(默认为false),spark.memory.offHeap.size
(默认为0)和OFF_HEAP
持久性级别控制。这样可以减轻垃圾收集的暂停。
DataFrames and Datasets
作为Tungsten项目的一部分,高级API使用自己的方式来管理内存。由于数据类型是框架已知的,并且它们的生命周期定义非常明确,因此可以通过预先分配内存块并显式微管理这些块来完全避免垃圾回收。这样可以极大地重用已分配的内存,从而有效地消除了对执行内存进行垃圾回收的需要。这种优化实际上效果很好,以至于启用堆外内存几乎没有其他好处(尽管仍有很多好处)。
5.集群资源 Cluster resources
导致性能降低的最后一个重要点是群集资源分配不足。从低效率地使用数据局部性(通过与散乱的执行程序打交道)到防止在不需要集群资源时浪费集群资源,这种形式有很多种。
数据局部性
为了获得良好的性能,我们的应用程序的计算应尽可能接近实际数据,以避免不必要的传输。这意味着在存储数据本身的计算机上运行执行程序是一个非常好的主意。使用HDFS时,Spark可以以最大程度地提高这种可能性来优化执行程序的分配。但是,我们可以通过良好的设计来进一步提高这一点。
通过增加单个执行程序的资源,同时减少执行程序的总数,我们可以减少所需的节点间通信量,从本质上迫使任务必须由数量有限的节点来处理。以以下示例资源分配为例:
num_executors | executor_cores | executor_memory |
---|---|---|
15 | 1 | 1g |
5 | 3 | 3g |
3 | 5 | 5g |
在所有情况下,我们将使用相同数量的资源(15个内核和15GB内存)。但是,由于我们减少了执行程序的总数,因此也减少了在执行程序之间传输数据的需求。通常使第三个选项最快。另一方面,取决于请求的操作,节点级别的I / O吞吐量可能会受到限制,因此我们不能无限地增加它。例如,对于HDFS I / O,每个执行者的核心数量被认为可以达到大约五个峰值。
当使用spark.locality.wait
设置(默认为3秒)及其子部分(spark.locality.wait
默认情况下相同)从集群中读取数据时,我们还可以调整Spark与局部性相关的配置。这些定义了基于位置的计划的超时(达到时降低了位置限制)。
动态分配
在应用程序范围内对执行程序进行明确的分配可能会有其不利之处。在某些情况下,我们可能不希望在整个计算期间都拥有统一数量的执行程序,而是想要一些扩展。在给定的时间群集上可能没有足够的资源,但是无论如何,我们都希望运行计算,我们可能正在处理一个需要更少资源并且不希望消耗更多资源的转换,依此类推。其中,动态分配的用武之地。
使用动态分配(通过设置spark.dynamicAllocation.enabled
为true 启用),Spark通过尝试分配尽可能多的执行程序来开始每个阶段(最多达到给定阶段的最大并行度spark.dynamicAllocation.maxExecutors
,默认情况下为infinity),其中第一阶段必须至少获得spark.dynamicAllocation.initialExecutors
(与spark.dynamicAllocation.minExecutors
或spark.executor.instances
默认情况下)。
在计算过程中,如果执行程序空闲时间超过spark.dynamicAllocation.executorIdleTimeout
了默认时间(默认情况下为60秒),它将被删除(除非它将执行程序的数量降至以下值spark.dynamicAllocation.minExecutors
(默认情况下为0)。这确保了我们的应用程序在执行时不会不必要地占用集群资源更便宜的改造。
为了能够启用动态分配,我们还必须启用Spark的外部随机播放服务。它充当在群集中每台计算机上运行的独立服务器,当适当的执行程序不再存在(已被删除或丢失)时,它能够管理随机播放文件。这在遗失执行者(例如由于抢占)的情况下也是有益的,因为所讨论的随机数据不必重新计算。
投机执行
有时,即使我们正确地进行了所有操作,由于我们无法控制的情况(与Spark,硬件故障等无关的高负载),我们仍然可能在特定计算机上获得较差的性能。对于这些情况,我们可能会指示Spark在检测到此类散乱者后自动重新执行任务。为此,启用spark.speculation
设置。可以使用以下设置配置检测例程:spark.speculation.interval
定义检查散乱者的频率(默认为100ms),spark.speculation.multiplier
定义散乱者的速度必须慢多少倍(默认为1.5)并spark.speculation.quantile
定义必须执行的任务比例直到检测程序开始运行(默认为0.75)。
结论
如您所见,为性能而设计一个Spark应用程序可能会非常具有挑战性,并且此过程的每一步似乎都在增加复杂性,降低多功能性或延长对特定用例的分析方面付出了代价。幸运的是,由于典型的Spark应用程序对性能并不敏感,因此几乎不需要全部实现。此外,仅通过使用高级API(DataFrames或DataSets)就可以实现很多目标。尽管必须在开发过程的早期就做出使用它们的决定,因为切换它们并非易事。
此外,还有许多其他技术可以帮助进一步提高Spark作业的性能。即GC调整,适当的硬件配置和调整Spark的众多配置选项。