Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 1535|回复: 0

[默认分类] Spark程序运行常见错误解决方法以及优化

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2020-8-7 16:24:53 | 显示全部楼层 |阅读模式
    一.org.apache.Spark.shuffle.FetchFailedException


    1.问题描述


    这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,非常的耗时。


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/9e8d969fbb1d43c0829b218d73950209/xoom8spy820.jpeg


    2.报错提示


    (1) missing output location


    org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0  


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/165c551964184939a51744b8cafd39e7/zn8qxask174.jpeg


    (2) shuffle fetch faild


    org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268  


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/dcde506838ad4e92be6f4a5f2c0e70a5/gyx-27wc975.jpeg


    当前的配置为每个executor使用1cpu,5GRAM,启动了20个executor


    3.解决方案


    一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。


    spark.executor.memory 15G
    spark.executor.cores 3
    spark.cores.max 21


    启动的execuote数量为:7个


    execuoteNum = spark.cores.max/spark.executor.cores  


    每个executor的配置:


    3core,15G RAM  


    消耗的内存资源为:105G RAM


    15G*7=105G  


    可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就结束了。


    二.Executor&Task Lost


    1.问题描述


    因为网络或者gc的原因,worker或executor没有接收到executor或task的心跳反馈


    2.报错提示


    (1) executor lost


    WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)  


    (2) task lost


    WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed  


    (3) 各种timeout


    java.util.concurrent.TimeoutException: Futures timed out after [120 second  


    ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong  


    3.解决方案


    提高 spark.network.timeout 的值,根据情况改成300(5min)或更高。


    默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性


    spark.core.connection.ack.wait.timeout
    spark.akka.timeout
    spark.storage.blockManagerSlaveTimeoutMs
    spark.shuffle.io.connectionTimeout
    spark.RPC.askTimeout or spark.rpc.lookupTimeout


    三.倾斜


    1.问题描述


    大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢。


    分为数据倾斜和task倾斜两种。


    2.错误提示


    (1) 数据倾斜


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/75fe361c65df4f12aa31376d269f9e05/_lxfyzak842.jpeg


    (2) 任务倾斜


    差距不大的几个task,有的运行速度特别慢。


    3.解决方案


    (1) 数据倾斜


    数据倾斜大多数情况是由于大量null值或者""引起,在计算前过滤掉这些数据既可。


    例如:


    sqlContext.sql("...where col is not null and col != """)  


    (2) 任务倾斜


    task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。


    或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。


    spark.speculation true


    spark.speculation.interval 100 - 检测周期,单位毫秒;


    spark.speculation.quantile 0.75 - 完成task的百分比时启动推测


    spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。


    四.OOM(内存溢出)


    1.问题描述


    内存不够,数据太多就会抛出OOM的Exeception


    因为报错提示很明显,这里就不给报错提示了。。。


    2.解决方案


    主要有driver OOM和executor OOM两种


    (1) driver OOM


    一般是使用了collect操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可。


    (2) executor OOM


    1.可以按下面的内存优化的方法增加code使用内存空间


    2.增加executor内存总量,也就是说增加spark.executor.memory的值


    3.增加任务并行度(大任务就被分成小任务了),参考下面优化并行度的方法


    优化


    1.内存


    当然如果你的任务shuffle量特别大,同时rdd缓存比较少可以更改下面的参数进一步提高任务运行速度。


    spark.storage.memoryFraction - 分配给rdd缓存的比例,默认为0.6(60%),如果缓存的数据较少可以降低该值。


    spark.shuffle.memoryFraction - 分配给shuffle数据的内存比例,默认为0.2(20%)


    剩下的20%内存空间则是分配给代码生成对象等。


    如果任务运行缓慢,JVM进行频繁gc或者内存空间不足,或者可以降低上述的两个值。


    "spark.rdd.compress","true" - 默认为false,压缩序列化的RDD分区,消耗一些cpu减少空间的使用


    如果数据只使用一次,不要采用cache操作,因为并不会提高运行速度,还会造成内存浪费。


    2.并行度


    spark.default.parallelism  


    发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会造成很多小任务,增加启动任务的开销,太小,运行大数据量的任务时速度缓慢。


    spark.sql.shuffle.partitions  


    sql聚合操作(发生shuffle)时的并行度,默认为200,如果任务运行缓慢增加这个值。


    相同的两个任务:


    spark.sql.shuffle.partitions=300:  


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/aaf022f84e484cec9552059c9f77aa02/drfzu2ay458.jpeg


    spark.sql.shuffle.partitions=500:  


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/7b905d67032647fca6f8cbdcc5d2c754/fmmrk2qq156.jpeg


    速度变快主要是大量的减少了gc的时间。


    修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)来操作。


      


      


      


    ========================================================================================


      


    五、踩坑SPARK之容错机制


    2018-04-15 14


    SPARK的容错


    问题
    SPARK的任务重启
    SPARK 2.1.0的坑


    SPARK的容错


    这块机制其实还不是太明白,很多都是看的
    这位兄弟的博客,这里说说今天遇到的问题以及踩到的坑。


    问题


    最近在调一个spark程序,因为数据量太大,存在一些性能障碍。之前join的问题已经解决(过两天把这个方案补上)。一直以为这样就解决问题了,但通过新数据的测试,发现耗时仍然可能特别严重。一个问题是几个任务的GC时间过长,导致整体运行时间特别长(这个问题没有得到复现,如果再次遇到的话,目前只能通过一些已有GC方案解决);另一个问题是,即使没有大的GC耗时,计算时间依旧很感人(一个任务大约要4~5h,是不大能接受的)。


    为了加速任务,在队列资源不是特别紧张的前提下,我决定加一些机器。具体做法就是加num-executors, executor-memory以及executor-cores,然后把default-parallelism增大。一开始观察,速度的确有加快的样子,估计可以提速一倍(也是应该的,毕竟资源也加了一倍左右)。但是当任务跑到1/4的时候,突然出现了一个意外:一个executor突然挂了!


    SPARK的任务重启


    之前没有仔细研究过,executor挂了spark会怎么处理。因为毕竟已经跑了一些结果了,总不能从头开始再重跑一遍吧。


    首先我们来看一下driver日志报的问题:


    FetchFailed(BlockManagerId(301, some_port, 7391, None), shuffleId=4, mapId=69, reduceId=1579, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to some_port


    原来是某个exetutor想要fetch数据(应该是shuffle read),但那个有数据的executor挂了,导致fetch失败。为啥我知道executor挂了?我是通过spark-ui看到的。


    我们想一下,那个executor挂了的话,有什么后果?那个executor上存着上个stage算好的数据,然后这个stage的任务会依赖那些数据,所以这个会影响到很多这个stage的任务。


    下面分三个角度看这个stage的任务:这个stage已经算好的任务,应该是不需要重新计算的;这个stage未启动的任务暂时不受到影响;这个stage已经启动但未完成的任务是什么影响呢?这个我们稍后再说。


    先看spark在知道executor挂了之后做了些什么事情?假设我们当前的stage是9号stage,默认叫做9.0;现在因为那个executor挂了,这个stage不能顺利继续下去了。所以,spark就重启一个新的stage,叫做9.1。由于已经算好的就不要算了,所以任务数量就是之前的总量减去已经计算完成的数量。对于9.0已经启动但未完成的任务,9.1仍然会重启,但似乎二者之前没有进行沟通。


    下面来看,那个executor已经算好的数据现在丢失了,spark要怎么做?由于spark的rdd之间的“血缘”关系,可以根据那个executor上rdd的生成方法,再重新算一遍就好了。这个只涉及到那个executor上的数据,所以开销会很小,但有可能会重新计算多个stage(我遇到的是分钟级别的重新计算)。


    讲道理,有了这个容错重启任务的机制,分钟级别的重算不会带来很大的额外时间开销的。但通过spark ui观察,在那个9.0的时候,并行的任务有近1000个,现在9.1的并行任务只剩下300~400个,速度变得很慢,加了资源就跟没加一样,这怎么能忍?


    回头看一下executor的log,积累了一段时间发现,很多executor一直在报这个:


    java.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:09 ERROR OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:09 ERROR OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection from some_port closed18/04/15 08:15:09 INFO RetryingBlockFetcher: Retrying fetch (1/30) for 20 outstanding blocks after 10000 ms18/04/15 08:15:19 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.18/04/15 08:17:26 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.18/04/15 08:17:26 ERROR RetryingBlockFetcher: Exception while beginning fetch of 20 outstanding blocks (after 1 retries)


    闲着无聊的我,就这样看了两个小时,这个错才消停。仔细观察,好像这个在尝试30次fetch数据。不过fetch的数据源就是那个已经挂了的executor,既然已经挂了,还一直在那儿尝试,不是有毛病嘛。


    另外一个问题就是,为毛要重试30次这么多?感觉这应该是个配置,那就在spark ui的environment的tab里面搜一搜30。哈哈,果然有个30个在那儿:


    spark.shuffle.io.maxRetries: 30


    看着名字,应该就是它了!spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!然后还有一个对应的参数:spark.shuffle.io.retryWait=10s,这个表示两次retry之间的间隔。知道这个问题后,查了一下文档,发现官方默认的retry次数是3次,不知道哪个运维把默认参数改成了30!还应该挨千刀的是,retryWait也从默认的5s改成了10s。跑得慢的原因很明显了,就是这两个参数,导致很多executor在进行无谓的挣扎,想要从一个挂了的executor上取数,也就是两个小时,一半以上的executor的资源都浪费了。


    但是稍等,讲道理30次乘以10s,最多也就浪费300s,也就是5min,怎么会浪费2h呢?这里我猜想:


    18/04/15 08:15:19 INFO TransportClientFactory: Found inactive connection to some_port, creating a new one.


    这里应该是一个提示,当我发现那个executor没法连接上的时候,就想着重新建立一个连接。但毕竟那个节点已经挂了,必然一直没有回应,那就需要等待连接超时。连接超时时间很长,比如是5min,那算下来这个时间也就差不多要两个小时了。


    SPARK 2.1.0的坑


    那么问题又来了,spark怎么这么傻?明明那个exector挂了,还是要做尝试。难道driver不能告知每个executor:那个挂了,不要去那里取数了,已经起了的任务就结束吧。通过查询多方资料才知道,原来早先设计的人好像没有考虑到这一点。下面是一些jira和github的issue,都是在吐槽这个问题:


    [url=https://issues.apache.org/jira/browse/SPARK-20178https://issues.apache.org/jira/browse/SPARK-20230[url=https://github.com/apache/spark/pull/17088]https://github.com/apache/spark/pull/17088[/url]]https://issues.apache.org/jira/browse/SPARK-20178https://issues.apache.org/jira/browse/SPARK-20230[url=https://github.com/apache/spark/pull/17088]https://github.com/apache/spark/pull/17088[/url][/url]


    看样子是17年5月才close掉这个问题,所以可能要到spark 2.3.0里面才会修复这个问题。也算是踩了一次2.1.0版本spark的坑,不过公司里spark是不能随意升级的,以后还是需要手动处理这个问题,比如自己设置retry次数。不过一个可能的坑是,这个参数的描述是:


    This retry logic helps stabilize large shuffles in the face of long GC pauses or transient network connectivity issues.


    所以GC如果是个问题的话,可能还要往上调。


    另外一个收获就是知道了(Netty only)在文档里的意思,这个网络通信库看来我们用得很随意呀。


      


      


      


    六、Spark性能调优之合理设置并行度


    1.Spark的并行度指的是什么?


         spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度!


         当分配完所能分配的最大资源了,然后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你分配下去的资源都浪费掉了。同时并行运行,还可以让每个task要处理的数量变少(很简单的原理。合理设置并行度,可以充分利用集群资源,减少每个task处理数据量,而增加性能加快运行速度。)


      


         举例:


             假如, 现在已经在spark-submit 脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor ,每个executor 有10G内存,每个executor有3个cpu core 。 基本已经达到了集群或者Yarn队列的资源上限。


    task没有设置,或者设置的很少,比如就设置了,100个task 。 50个executor ,每个executor 有3个core ,也就是说


    Application 任何一个stage运行的时候,都有总数150个cpu core ,可以并行运行。但是,你现在只有100个task ,平均分配一下,每个executor 分配到2个task,ok,那么同时在运行的task,只有100个task,每个executor 只会并行运行 2个task。 每个executor 剩下的一个cpu core 就浪费掉了!你的资源,虽然分配充足了,但是问题是, 并行度没有与资源相匹配,导致你分配下去的资源都浪费掉了。合理的并行度的设置,应该要设置的足够大,大到可以完全合理的利用你的集群资源; 比如上面的例子,总共集群有150个cpu core ,可以并行运行150个task。那么你就应该将你的Application 的并行度,至少设置成150个,才能完全有效的利用你的集群资源,让150个task ,并行执行,而且task增加到150个以后,即可以同时并行运行,还可以让每个task要处理的数量变少; 比如总共 150G 的数据要处理, 如果是100个task ,每个task 要计算1.5G的数据。 现在增加到150个task,每个task只要处理1G数据。


    2.如何去提高并行度?


        1、task数量,至少设置成与spark Application 的总cpu core 数量相同(最理性情况,150个core,分配150task,一起运行,差不多同一时间运行完毕)官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍 ,比如150个cpu core ,基本设置 task数量为 300~ 500. 与理性情况不同的,有些task 会运行快一点,比如50s 就完了,有些task 可能会慢一点,要一分半才运行完,所以如果你的task数量,刚好设置的跟cpu core 数量相同,可能会导致资源的浪费,因为 比如150task ,10个先运行完了,剩余140个还在运行,但是这个时候,就有10个cpu core空闲出来了,导致浪费。如果设置2~3倍,那么一个task运行完以后,另外一个task马上补上来,尽量让cpu core不要空闲。同时尽量提升spark运行效率和速度。提升性能。


         2、如何设置一个Spark Application的并行度?


           spark.defalut.parallelism   默认是没有值的,如果设置了值比如说10,是在shuffle的过程才会起作用(val rdd2 = rdd1.reduceByKey(_+_) //rdd2的分区数就是10,rdd1的分区数不受这个参数的影响)


           new SparkConf().set(“spark.defalut.parallelism”,”“500)


      


         3、如果读取的数据在HDFS上,增加block数,默认情况下split与block是一对一的,而split又与RDD中的partition对应,所以增加了block数,也就提高了并行度。


         4、RDD.repartition,给RDD重新设置partition的数量


         5、reduceByKey的算子指定partition的数量


                      val rdd2 = rdd1.reduceByKey(_+_,10)  val rdd3 = rdd2.map.filter.reduceByKey(_+_)


         6、val rdd3 = rdd1.join(rdd2)  rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量。


         7、spark.sql.shuffle.partitions //spark sql中shuffle过程中partitions的数量


      


      


      


    七、自己总结:


      


      


      


    1、项目当中遇到的问题以及解决方式:


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/b7cb23dbf2794b3dbf2570c674b5495f/rak_su%60%7B%28k_pgwwn%25%5Dfz%28q0.png


    一开始设置的查询mysql的分割并行查询是8,结果就遇到了程序反复移除和重新生成executor,但是任务不能完成,一个晚上还没有跑完。后来在clouder manager里面查看yarn的监控。发现其每个nodemanager的分配的executor非常不均匀。然后计算了一下executor的总数是8个。


      


    看下面这个图,我在spark任务的提交参数里面设置的executor实例是9个,对不上的。


    最终才把mysql的查询并行设置成30(结果很快任务就跑完了,估计是并行度增加每个executor分到的数据少了,没有gc了,也就不存在反复重试,重试不行还重启executor)。然后再yarn里面看到是启动了30个container。有15个是同时运行的。就是一个container(包含executor)才一个cpu核心,对应的每个executor的内存也没有我设置的大。


    请参照下面这个图。这个是我的设置,说明它没有参照我的设置进行资源分配,而是根据查询mysql的并行度进行了资源分配。估计是根据mysql的并行度,产生任务、数据分区,然后是计算。我原来以为是按照默认的并行度64进行的;所以一直没有设置并行度(spark.default.parallelism)这个参数。万万没想到它是根据mysql的查询并行度进行并行的。


    G:/youdaoNote/qq03D8FBFD6CA6844B75B9B71C0CE20583/fbe1cdfc704d4829a37ee47677906308/vq6yom%5D390dm.png


      


      


      


    2、出现问题的原理分析


    综合二.Executor&Task Lost 、 五、踩坑SPARK之容错机制


    可以总结如下:


    A、当数据集超大时(或者是分配不均匀或者分区太少、并行度不够等导致的单个executor内存不够),会造成executor内存不够,频繁gc。


      


    B、频繁的gc或者网络抖动,会造成数据传输超时、心跳超时等问题。


      


    C、由于spark的重试机制,会先根据配置的时间间隔,再次去重试拉取数据。


      


    D、超过重试次数之后,executor会被干掉,重新生成一个executor去重新执行。这样就导致了反复的remove掉executor,然后重新生成。但是任务还是不能完成。


      


      


    3、对spark的重试机制的参数进行设置(尝试次数、尝试间隔、还有各种通信超时时间)


    每次尝试失败都是要等到通信超时,各种时间加起来,反复重试时间会很长


    A、


    spark.shuffle.io.maxRetries: 30 #尝试次数


    spark一直在尝试fetch那个挂了的executor上的数据,一直要尝试30次!


    B、


    spark.shuffle.io.retryWait=10s #这个表示两次retry之间的间隔。


    C、


    spark.network.timeout=300 #配置所有网络传输的延时


    如果没有主动设置以下参数,默认覆盖其属性


    spark.core.connection.ack.wait.timeout
    spark.akka.timeout
    spark.storage.blockManagerSlaveTimeoutMs
    spark.shuffle.io.connectionTimeout
    spark.rpc.askTimeout or spark.rpc.lookupTimeout


      


      


      


      


      


      


    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-23 20:29 , Processed in 0.351162 second(s), 39 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表