Note-39133-5

Token ID: 1

ERC-721 1 Transfers

Metadata

{
  "title": "Spark Job长尾问题排查及小文件优化的思考",
  "tags": [
    "post",
    "大数据",
    "spark"
  ],
  "summary": "最近在测试spark on k8s的时候,遇到了一些性能问题,于是记录一下排查过程,做一下案例的复盘。",
  "sources": [
    "xlog"
  ],
  "external_urls": [
    "https://bladedragon.xlog.app/Spark-Job-chang-wei-wen-ti-pai-cha-ji-xiao-wen-jian-you-hua-de-si-kao"
  ],
  "date_published": "2023-04-15T07:41:56.261Z",
  "content": "但是最近在测试spark on k8s的时候,遇到了一些性能问题,于是记录一下排查过程,做一下案例的复盘。\n\n![20220202163506_2db51](ipfs://bafybeie7x5ndzibs43bzya2q4kq4nyju65pgboyfrwa7ubkibyl3qckghu)\n\n\n## 案例再现\n我们使用的底层集群是AWS的eks集群,在此之上搭建了一套传统的hadoop yarn+ spark环境。通俗来说就是将yarn的resourceManager,nodeManager等具体的组件转化成k8s的pod,但是上层调度逻辑不变,做了一套两层调度的系统。该系统的具体实现方案和调度逻辑这里按下不表,因为这和今天讲述的性能优化案例关系不大,我们只要了解到这是传统的yarnspark,但是底层是k8s就好。\n我采用的任务是来自我们生产环境的一个小任务,期望运行时间5min,数据量中等,输出表数据大约在2kw行。原先生产环境采用的是AWS的EMR on EC2集群,可以理解是传统的容器主机集群。现在将该任务迁移到eks集群后,时长达到了43min,于是观察其history ui:\n\n![IMG_export_20230415_163937706](ipfs://bafkreibupv2ba5te6y553nuzxljhjzmh2fovkcnufla57vwpp5kjduqwni)\n\n\n![IMG_export_20230415_163947995](ipfs://bafkreif5z3lsatlsla4dpvydzxzbqqskhs3ntpdllk3qot4pksjmooyrdy)\n\n发现其sql执行的阶段耗时与生产环境的耗时基本类似,阶段执行时长只有几分钟,但是整个spark job完成耗时却用了43min,明显在job执行尾部存在耗时问题,导致长尾现象发生。\n\n## 初步分析\n由于采用生产任务一样的sql,读写数据量也完全一致,排除业务逻辑导致的影响;\n排查driver日志,发现阶段结束后几乎没有有效日志,此时所有task已经执行完成,观察集群,executor的利用率也非常低。\n既然存在耗时,后台一定有耗时的线程在运行,于是查看spark driver的thread dump\n\n\n找到的真正的原因发现卡在s3的rename操作上\n\n\n## s3上rename操作带来的性能问题\n**首先谈一下rename。** spark的rename是指在spark提交作业过程中,为了保持数据的一致性,会生成临时文件来读写数据,当task执行完毕,会将临时文件rename为正式文件;当job执行完毕,会把该job的临时目录下的所有文件rename为正式文件\n其目录格式大概如图所示\n\n![image](ipfs://bafkreifer6vdsqnj5djinv7hu5jif42cxtdkfssut6ibtehegsk524m35u)\n\ndriver会通过FileFormatWriter选择合适的output committer并启动writer job,committer会决定如何提交job和task,提交流程如下图\n\n\n![image](ipfs://bafkreihph53o7imaojla572jdrizqevp4mewgtnrfvrknxah2oojczsapq)\n\n\nrename就发生在其中提交job和提交task的环节,具体的rename策略根据committer的策略而定。关于committer的细节等会再提。\n\n**为什么s3上rename会有性能问题** \nAWS 的s3 包括大多数对象存储,目录本身就是一个对象,因此,其目录rename需要经历list-copy-delete 的操作,相对与文件系统例如HDFS简单的rename,其开销自然非常大。在spark运行中,可能会生成非常多的小文件,即使是HDFS,要进行数万计小文件的rename,其性能尚且需要优化,更不要说s3了。\n\n## spark的文件提交协议\n\n在谈及如何优化之前,我们先回顾一下与之相关的Spark文件提交过程。从上一张图可以看到,Spark在job 提交过程中,实际上依然是调用的Hadoop的committer来采取具体的commit的策略。而committer要解决的问题,主要有以下几点:\n1. 处理文件失败重读导致的数据一致性问题\n2. 保证task推测执行下相同文件多写时的数据正确性\n3. 提高海量文件读写,合并的效率\n\n目前Hadoo提供的两种文件提交方式,通过`mapreduce.fileoutputcommitter.algorithm.version`进行切换\n\n### FileOutputCommitter V1\n**commit过程**\n1. 首先TaskAttempt会将TaskAttempt Data写入一个临时目录: `${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}`\n2. 当task data写完,可以提交task后,执行commitTask,将上述目录转移到正式目录:`${target_dir}/_temporary/${appAttempt}/${taskAttempt}`\n3. 当所有task 完成,执行commitJob, 将`${target_dir}/_temporary`下所有文件及目录转移到`${target_dir}`正式目录,并在提交完成后当前目录添加标识符 `_SUCCESS`来表示标识提交成功\n\n\n**数据一致性问题**\n1. 在TaskAttempt写入的阶段,如果发生task写失败需要重试,只需要重写`${taskAttempt}`目录下`/_temporary/`下的所有文件就行,可以保留原先正式的Attempt目录\n2. 如果发生application重试,也可以通过recoverTask直接恢复原先`${appAttempt}`目录下的正式目录文件,直接重命名到当前`${appAttempt}`目录下\n3. 由于存在两次rename,V1实际上是两阶段提交,rename前后数据的一致性都能得到保证,数据不一致的情况只有可能发生在rename的过程中\n\n**性能问题**\nV1的强一致性带来的负面作用就是两次rename的操作在海量文件生成的情景中可能导致耗时问题,尤其是commitJob阶段,由于是Driver单线程串行执行commit,此时如果需要rename大量文件, 其耗时可能会非常长\n\n### FileOutputCommitter V2\n1. 首先TaskAttempt会将TaskAttempt Data写入一个临时目录: `${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}`\n2. 当task data写完,可以提交task后,执行commitTask,将上述目录转移到 `${target_dir}`. 注意这里是直接移动到正式目录\n3. 当所有task 完成,由于所有data已经保存在正式目录下,所以commitJob就是单纯添加标识符 `_SUCCESS`来表示标识提交成功\n\n   \n**数据一致性问题**\n1.在taskAttempt写入的阶段,如果发生task写失败重试,由于此时可能task data已经被移动到正式目录,因此此时会出现脏数据\n2.如果发生application重试,由于之前application已提交的数据已经存在在正式目录,因此无需额外的重命名操作,直接继续进行其他数据的重试即可,当然,此时已提交的数据不一定完全正确,其中可能存在脏数据。\n3.可见V2过程牺牲一定的数据一致性,选择了最终一致性的方案,由于缺乏中间过程对数据正确性的保证,所以只能通过最后的_SUCCESS标识符来决定数据是否正确。同时,这还会带来另一个问题,由于存在脏数据,在任务长期运行中,这些脏数据可能无法被正确清理,从而占据存储空间,带来额外的开销\n\n**性能问题**\nV2之所以采取最终一致性的方案,目录就是减少V1 rename操作过多带来的耗时开销。相比V1,V2只需要在task完成后rename到正式目录,而且可以通过task线程的并行操作,其执行的时长会被大大降低\n\n\n## 小文件优化\n\n虽然上述的Committer的不同算法在一致性和性能上给了大家选择,但毕竟各有利弊。但在实际场景下,大家的选择总是希望“我全都要”\n\n![fm=173&fmt=auto&h=212&img_JPEG=&s=198008D41E4200570CB830AA0300E012&u=1102103420,837649793&w=393](ipfs://bafkreidf2numutt76wkoef5somfunozvhdiod4hxaem5sw4wwgtuaewlcm)\n\n除了在rename阶段进行优化外,性能杀手的源头:对海量小文件的优化也成为了一个行之有效的方法。\n### Spark现有的优化:\n在spark中内置有对小文件的优化,从文件生成角度:\n+ `spark.files.openCostInBytes` 利用这个参数设置预估打开文件的大小,设置高一点可以提高小文件分区的速度\n+ \n\n从业务侧考虑,大致思路是减少分区数来将小文件合并成大文件\n+ 使用coalesce或repartition操作合并分区。\n+ 减少使用动态分区或者使用 `distribute by`来控制reducer个数\n+ 多使用分区表来降低查询时产生的分区数量\n+ 或者使用更先进的文件压缩格式来提高小文件处理性能\n\n### AWS的特殊优化:\n由于我们在生产环境中使用了AWS 的EMR,也稍微了解了一下AWS团队在s3上小文件优化上的措施\n1. multi upload:其原理就是利用并发读写文件片段来提高处理s3读写的性能,基于此,衍生出EMRFS S3-optimized Committer和s3a Committer(开源),注意,该committer默认采用FileOutputCommitter V2方式提交,因此V2存在的问题在这些committer上也都会存在。\n3. 利用hdfs加速,在EMR中,考虑到文件系统对于rename等操作具有更好的性能,那我们不是可以在文件系统上先rename,再提交到s3上?在EMR中,就是在文件提交到s3前,先上传到类hdfs的文件系统上进行rename或者文件合并操作后再上传到s3上,这样比起纯s3读写,在性能上会有明显收益。当然坏处就是单独维护一个文件系统具有较高成本。\n\n### 其他优化思路:\n我们团队也在小文件合并上进行了优化,其优化思路就是在Job执行的最后,新建一个job用于合并小文件,通过继承Spark的`SqlHadoopMapReduceCommitProtocol`来实现插件式的扩展\n合并的思路是在commitTask之后,获取数据的分区信息,然后进行分组合并,最后在commitJob的时候直接将合并完的文件转移到正式目录中。其基本思路如下图\n\n![application-and-practice-of-spark-small-file-mergi1](ipfs://bafkreiaq5bqan5vj2rqxuje456csjokgg2r42mpyxg2v32rvi2yxguuxva)\n\n这样合并小文件的优点\n+ 该功能是插拔式的,对原生代码的侵入性较低\n+ 在海量小文件场景下优势明显\n\n  缺点\n  + 新起一个job进行优化,在任务最后会新增两个阶段用于小文件合并,会引入更多的task,带来一定的耗时\n\n\n## 尾声\n通过启动该功能,我重新跑一遍任务,最终耗时下降明显降低:\n\n![IMG_export_20230416_174701801](ipfs://bafkreie3g6j5zq7chnvidimwo3avjj3ov3b3uy4hiierujwa3i7yugojpe)\n\n\n\n当然优化并未完全结束,在eks上的任务耗时总体还是要比原EMR任务高,但这块问题的深入排查,等待下次有时间再分享吧",
  "attributes": [
    {
      "value": "Spark-Job-chang-wei-wen-ti-pai-cha-ji-xiao-wen-jian-you-hua-de-si-kao",
      "trait_type": "xlog_slug"
    }
  ]
}