Spark Applicaiton 开发遇到的问题
本文基于:Java 1.8,Spark 1.6环境
Transform 线程安全问题
NotSerializableException 异常分析及解决方法
在使用spark开发分布式数据计算作业过程中或多或少会遇到如下的错误:
1 |
|
分析
Spark处理的数据单元为RDD(即弹性分布式数据集),当我们对RDD进行一些transform操作(map,filter等操作)是由分布在不同机器上的executor来完成的。Spark不移动数据RDD,而是分布式地分发闭包算法。如果我们在driver中定义了一个变量,在map等操作中使用,则这个变量就要被分发到各个executor,因为driver和executor的运行在不同的jvm中,势必会涉及到对象的序列化与反序列化。如果这个变量没法序列化就会报异常。还有一种情况就是引用的对象可以序列化,但是引用的对象本身引用的其他对象无法序列化,也会有异常。
异常信息中Serialization stack给我们定位问题提供很多信息。
解决方法
以下构造一个例子:
UnserializableClass没有继承Serializable接口,有成员函数method,静态成员函数staticMethod
1 |
|
写一个测试类,如下:
1 |
|
第一个第二个测试用例会报错,如下Serialization stack信息:
1 |
|
由Serialization stack信息定位到我们调用UnserializableClass类对象方法时出错,相应修改办法:
-
将不可序列化的对象定义在闭包内
1
2
3
4
5@Test public void testNewUnserializableClassMethod() { JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); javaRDD.map(x -> new UnserializableClass().method(x)).collect().forEach(System.out::println); }
-
使用static修饰方法,即将方法改成类方法,如下代码。使用static或transient修饰不可序列化的属性从而避免序列化
1
2
3
4
5@Test public void testUnserializableClassStaticMethod() { JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); javaRDD.map(x -> UnserializableClass.staticMethod(x)).collect().forEach(System.out::println); }
-
给无法序列化的类加上java.io.Serializable接口,如下代码:
1
2
3
4
5
6
7package com.spark.examples.unserializabledemo; public class SerializableClass implements java.io.Serializable { public int method(int x) { return x * x; } }
注册序列化类无法解决NotSerializableException问题
1 |
|
Lambda表达式序列化问题
问题起因:使用Spark的Java接口做filter操作的过程中,遇到了匪夷所思的错误,问题大概可以抽象成下面一段代码:
1 |
|
这段代码中创建了两个List,一个String的,一个Integer的,然后调用java.util.Objects.nonNull()方法分别执行过滤掉null的逻辑。
这段代码是能够通过编译的,但是一旦运行,就会出现下面的异常:
1 |
|
可以看到,在list1上的过滤操作甚至都没有真正执行(没有使用Spark的shuffle操作),那么这个问题是怎么产生的呢?最初怀疑是method-ref的引用问题,是不是说在method-ref委派的过程中,由于应用到的类型不同,产生了二义性问题呢?使用下面的代码测试:
1 |
|
这段代码使用JDK自带的stream替代Spark进行相同的逻辑,结果是不但正确通过编译,执行结果也正常,执行结果如下:
1 |
|
这就否定了之前的假设,那么究竟是什么导致了异常呢?仔细思考,Java8的Stream和Spark究竟有什么本质的不同呢:都是并行处理框架,但是Spark是分布式的,分布式的涉及到网络传输,这必然涉及到数据处理任务可能会通过网络进行传输(Spark确实会把任务广播到各个节点上),因此一定会涉及到Task的序列化,会不会是序列化出现了问题?
为了验证这个猜测,构造了如下的代码:
1 |
|
这段代码先将Objects::nonNull向下转为Function<Integer,Boolean>类型
反序列化回来的过程中反序列化为Function<String,Boolean>类型
Object.nonNull()方法的原型如下:
1 |
|
所以对类型是没有要求的,是不是说明这段代码可以正确运行呢?我们执行这段代码:
1 |
|
java.lang.ClassCastException重现了。
这说明:
lambda表达式在序列化的过程中,不同于Java泛型的运行时擦除机制,会对类型进行特化,序列化前后的lambda表达式是携带类型信息的。
之后我又参考了R大在知乎上关于Lambda表达式与序列化的一个回答,证明了之前的假设。
问题解决:
在Spark中,同一个method-ref可能会绑定到同一个serializable lambda,再次重用如果类型不匹配就会引发异常;如果使用匿名函数的形式,匿名函数可以匹配到对应的类型,类型不同对应就是两个serializable lambda,因此不会引发问题。
1 |
|
爆内存相关问题汇总及解决
OOM
1 |
|
除了 exit code 137
外其它OOM提示都很明显,yarn container 137退出码按照SO的大神说:“Exit code 137 is a typical sign of the infamous OOM killer.”
解决方法:
- 加 executor 内存(spark.executor.memory),需注意on yarn时进程是按最小container memory的整数倍分配的。
- 优化程序内存占用
- 设置StorageLevel 到 DISK 或 MEMORY AND DISK,要注意persist只在action执行才生效,所以建议先count或isEmpty一下触发persist,然后再去做自己的flatMap/foreach等业务操作
Ref: hadoop-streaming-job-failure-task-process-exit-with-nonzero-status-of-137
Shuffle Read OOM
1 |
|
以上皆为可能的报错(但不代表报错一定就是OOM相关,要去找cause的exception;OOM不过是其中一种可能的cause),大致原因是shuffle后的executor读取数据超出了内存限制,然后挂了并且清除了相关的中间临时文件,其他的executor在尝试与其进行数据沟通时,要么executor丢失,要么无法读取其写出的shuffle文件等。当然FetchFailedException还有可能是其他原因,譬如某台机太繁忙无法响应等,这种情况可以尝试调整相关timeout参数进行尝试: spark.shuffle.io.maxRetries=6 ,spark.files.fetchTimeout=120s 。
如果你的单个shuffle block超过2g,然后又报类似以上列举的错误,你可能遭遇了以下 issue :
解决办法:
- 调大 repartition 数,减少每个repartition的size
- 调大executor内存
- on yarn的需调大 spark.executor.overheadMemory,按SO的说法,需要自己根据实际情况测试调到不报错为止
- 根据实际情况调整 spark.shuffle 的相关参数。shuffle参数中大多数是关于shuffle write和shuflling的配置,而且基本大多数默认都是比较优的配置了。唯一shuffle read相关的
spark.reducer.maxMbInFlight
涉及源码参见 Spark技术内幕: Shuffle详解(二) ,因为shuffle fetch阶段是边fetch边处理的,所以适当调小该值有助于改善shuffle阶段的内存占用。 shuffle部分参数说明 - 有可能也是你物理内存不够了,增加可用内存吧
- 优化数据结构使用,尽量使用原始类型和数组,泛型和对象会造成较大的传输和存储开销。可考虑利用一些高效的序列化方案,比如protostuff。
Ref:
what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept
fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-set
Streaming OOM
1 |
|
Spark Streaming 中此错误产生原因是streaming产生了堆积,超过了receiver可承受的内存大小,因此旧的未执行的job被删除来去接收新的job。
解决方法:
- 调大 receiver 内存
- kafka 直接消费没有做rdd transform的可考虑换 direct stream ,防止堆积。
- spark 1.6.x 可采用 spark.streaming.backpressure.enabled 机制回压,控制接收速率,防止爆内存。SparkConf设置
spark.streaming.backpressure.enabled=true
,spark.streaming.backpressure.pid.minrate=0.001
Ref:
Insufficient Physical Memory
1 |
|
其实就是没有足够的物理内存去启动这个JVM了,比如你JVM申请5g,实际只剩下4g可用的物理内存,就会报错,然后jvm启动失败进程退出。
解决方法:
- 加物理内存
- 优化程序和数据结构,调低jvm内存需求
- kill掉其他占用系统内存进程释放可用内存
问题:这里的可用内存包不包括操作系统cache的内存呢? (free -m
可查看OS的free和cached内存)
Ref : insufficient-memory-for-the-java-runtime-environment-message-in-eclipse
爆内存相关问题小结
其实以上的很多解决办法基本是OOM大多数问题通用的,比如持久化、内存调大、数据结构优化。
如果以上问题还不能解决,请参考:http://spark.apache.org/docs/latest/tuning.html 中的 Memory Tuning部分