一、hadoop&hive&spark官方文档
官网文档永远是最好的指导手册
spark2.2参数配置官方文档/sparksql参数配置文档/spark最新版本官方文档
二、常用spark参数具体含义
参数分类 | 场景 | 参数 | 公司集群默认值 | 参数含义 |
---|---|---|---|---|
executor申请&并行度 | 一般需要大数量下,需要提升任务并行度时可以考虑修改这些参数 | spark.dynamicallocation.enabled | true | 是否开启动态资源分配,平台默认开启,同时强烈建议用户不要关闭。理由:开启动态资源分配后,spark可以根据当前作业的负载动态申请和释放资源 |
spark.dynamicallocation.maxexecutors | 1000 | 开启动态资源分配后,同一时刻,最多可申请的executor个数。平台默认设置为1000。当在spark ui中观察到task较多时,可适当调大此参数,保证task能够并发执行完成,缩短作业执行时间。但同时申请过多的executor会带来资源使用的开销,所以要多方面考虑来设置(可以参考万金油版参数的设置思路) | ||
spark.dynamicallocation.minexecutors | 3 | 和maxexecutors相反,此参数限定了某一时刻executor的最小个数。平台默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活,保证任务可以迅速调度。部分小任务有时会出现申请不到资源而一直等待,可以尝试设置该参数为1,减少pending的概率 | ||
spark.dynamicallocation.initialexecutors |
| 初始化的时候的executor数量,仅在动态资源分配时生效 | ||
spark.executor.instances |
| 初始化的时候的executor数量,动态和非动态资源分配均生效 | ||
spark.executor.cores | 1 | 单个executor上可以同时运行的task数。spark中的task调度在线程上,该参数决定了一个executor上可以并行执行几个task。这几个task共享同一个executor的内存(spark.executor.memory+spark.yarn.executor.memoryoverhead)。适当提高该参数的值,可以有效增加程序的并行度,是作业执行的更快,但会使executor端的日志变得不易阅读,同时增加executor内存压力,容易出现oom,所以一般需要配合的增加executor的内存。在作业executor端出现oom时,如果不能增大spark.executor.memory,可以适当降低该值。平台默认设置为1。 该参数是executor的并发度,和spark.dynamicallocation.maxexecutors配合,可以提高整个作业的并发度。 | ||
内存分配 | 一般出现了内存溢出,可以考虑修改这些参数 | spark.executor.memory | 2g | executor用于缓存数据、代码执行的堆内存以及jvm运行时需要的内存。当executor端由于oom时,多数是由于spark.executor.memory设置较小引起的。该参数一般可以根据表中单个文件的大小进行估计,但是如果是压缩表如orc,则需要对文件大小乘以2~3倍,这是由于文件解压后所占空间要增长2~3倍。平台默认设置为2g。 |
spark.yarn.executor.memoryoverhead | 1024 | spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。spark根据spark.executor.memory+spark.yarn.executor.memoryoverhead的值向rm申请一个容器,当executor运行时使用的内存超过这个限制时,会被yarn kill掉,最大值是16gb | ||
spark.driver.memory | 10g | driver使用内存大小, 平台默认为10g,根据作业的大小可以适当增大或减小此值。一般有大表的广播可以考虑增加这个数值 | ||
spark.yarn.driver.memoryoverhead | spark.driver.memory * 0.1,并且不小于384m | 类似于spark.yarn.executor.memoryoverhead,即driver java进程的off-heap内存 | ||
spark.memory.fraction | 0.3 | 存储+执行内存占节点总内存的大小,社区版是0.6。平台为了方便的把hive任务迁移到spark任务,把该区域的内存比例调小至0.3,给other区留取更大的内存空间(udf的影响)。 个人建议非udf的任务中可以调整到0.6,减少spill的次数,提升性能 | ||
spark.memory.storagefraction | 0.5 | 内存模型中存储内存占存储+执行内存的比例,由于在同一内存管理下可以动态的占用,该参数保持不变即可 | ||
spark.sql.windowexec.buffer.spill.threshold | 40960 | 当用户的sql中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘。该参数如果设置的过小,会导致spark频繁写磁盘,如果设置过大则一个窗口中的数据全都留在内存,有oom的风险。但是,为了实现快速读入磁盘的数据,spark在每次读磁盘数据时,会保存一个1m的缓存。 举例:当spark.sql.windowexec.buffer.spill.threshold为10时,如果一个窗口有100条数据,则spark会写9((100 - 10)/10)次磁盘,在读的时候,会创建9个磁盘reader,每个reader会有一个1m的空间做缓存,也就是说会额外增加9m的空间。 当某个窗口中数据特别多时,会导致写磁盘特别频繁,就会占用很大的内存空间作缓存。因此如果观察到executor的日志中存在大量“spilling data because number of spilledrecords crossed the threshold”日志,则可以考虑适当调大该参数,平台默认该参数为40960。 | ||
文件输入输出与合并 | 当出现map端数据倾斜,map端由于小文件启动大量task,或者结果生成大量小文件时,可以考虑修改这些参数 | spark.hadoop.hive.exec.orc.split.strategy |
| bi策略以文件为粒度进行split划分;etl策略会将文件进行切分,多个stripe组成一个split;hybrid策略为:当文件的平均大小大于hadoop最大split值(默认256m)时使用etl策略,否则使用bi策略。该参数只对orc格式生效 注意:当etl策略生效时,如果输入文件的数量以及每个文件的stripe数量过多,有可能会导致driver压力过大,出现长时间计算不出task数量,甚至oom的情况。 当bi策略生效时,也有可能会出现输入数据倾斜。 |
spark.hadoop.mapreduce.input.fileinputformat.split.minsize |
| map端输入文件的切分和合并参数,可以把小文件进行合并,大文件进行切割 其中spark.hadooprdd.targetbytesinpartition是美团自己开发的参数,社区版不存在。这个参数的值用来标识一个hadooprdd的大小,当同分区的多个文件小于参数设置值时,可以进行合并(注意:不能跨分区合并) spark.hadoop.mapreduce.input.fileinputformat.split.minsize spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 这两个参数控制了单个文件的切分和合并大小,跨文件、跨分区不行 maxsize控制了split的最大值,minsize控制了最小值 一些测试结果如下
从有限的测试结果中发现,hadooprdd参数要比另两个参数表现好 | ||
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize |
| |||
spark.hadooprdd.targetbytesinpartition |
| |||
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 2 | 文件提交算法,mapreduce-4815 详细介绍了 fileoutputcommitter 的原理,version=2 是批量按照目录进行提交,version=1是一个个的按照文件提交。设置 version=2 可以极大的节省文件提交至hdfs的时间,减轻nn压力。 | ||
spark.sql.adaptive.shuffle.targetpostshuffleinputsize |
| 开启spark.sql.adaptive.enabled后,最后一个stage在进行动态合并partition时,会根据shuffle read的数据量除以该参数设置的值来计算合并后的partition数量。所以增大该参数值会减少partition数量,反之会增加partition数量。 注意:对于shuffle read数据量很大,但是落地文件数很小无法很好的处理,例如:小表left join大表 | ||
spark.sql.mergesmallfilesize |
| 与 hive.merge.smallfiles.avgsize 类似,写入hdfs后小文件合并的阈值。如果生成的文件平均大小低于该参数配置,则额外启动一轮stage进行小文件的合并 | ||
spark.sql.targetbytesinpartitionwhenmerge |
| 与hive.merge.size.per.task 类似,当额外启动一轮stage进行小文件的合并时,会根据该参数的值和input数据量共同计算出partition数量 | ||
mapjoin | 想使用自动mapjoin时,需要考虑的参数 | spark.sql.autobroadcastjointhreshold | 26214400 | 当执行join时,小表被广播的阈值。当被设置为-1,则禁用广播。表大小需要从 hive metastore 中获取统计信息。该参数设置的过大会对driver和executor都产生压力。 注意,由于我们的表大部分为orc压缩格式,解压后的数据量3-5倍甚至10倍 |
spark.sql.statistics.fallbacktohdfs | false | 当从hive metastore中没有获取到表的统计信息时,返回到hdfs查看其存储文件大小,如果低于spark.sql.autobroadcastjointhreshold依然可以走mapjoin。该参数在社区版默认是true,由于会增大对hdfs的压力,被平台改成false了,个人建议打开 | ||
spark.sql.broadcasttimeout | 7200 | 在broadcast join时 ,广播等待的超时时间 | ||
shuffle阶段 | shuffle阶段的调优,出现fetchfailed可以考虑修改 | spark.sql.shuffle.partitions | 2000 | reduce阶段(shuffle read)的数据分区,分区数越多,启动的task越多(1:1),“一般”来说速度会变快,同时生成的文件数也会越多。 个人建议一个partition保持在256mb左右的大小就好 |
spark.sql.adaptive.enabled | true | 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个executor的性能,还能缓解小文件问题。 | ||
spark.shuffle.adaptive.all | fasle | 美团独有的参数,true表示所有stage都会动态调整partition,false表示只有最后一个stage开启。目前不建议打开,但如果只是聚合操作的话可以打开。如果有很多join会增加额外的shuffle | ||
spark.sql.adaptive.shuffle.targetpostshuffleinputsize |
| 开启spark.sql.adaptive.enabled后,最后一个stage在进行动态合并partition时,会根据shuffle read的数据量除以该参数设置的值来计算合并后的partition数量。所以增大该参数值会减少partition数量,反之会增加partition数量。 注意:对于shuffle read数据量很大,但是落地文件数很小无法很好的处理,例如:小表left join大表 | ||
spark.sql.adaptive.minnumpostshufflepartitions |
| 开启spark.sql.adaptive.enabled后,合并之后保底会生成的分区数 | ||
spark.shuffle.service.enabled | true | 启用外部shuffle服务,这个服务会安全地保存shuffle过程中,executor写的磁盘文件,因此executor即使挂掉也不要紧,必须配合spark.dynamicallocation.enabled属性设置为true,才能生效,而且外部shuffle服务必须进行安装和启动,才能启用这个属性 | ||
spark.reducer.maxsizeinflight | 24m | 同一时刻一个reducer可以同时拉取的数据量大小 | ||
spark.reducer.maxreqsinflight | 10 | 同一时刻一个reducer可以同时产生的请求数 | ||
spark.reducer.maxblocksinflightperaddress | 1 | 同一时刻一个reducer向同一个上游executor最多可以拉取的数据块数 | ||
spark.reducer.maxreqsizeshuffletomem | 536870911 | shuffle请求的文件块大小 超过这个参数值,就会被强行落盘,防止一大堆并发请求把内存占满,社区版默认long.maxvalue,美团默认512m | ||
spark.shuffle.io.connectiontimeout | 120s | 客户端超时时间,超过该时间会fetchfailed | ||
spark.shuffle.io.maxretries | 3 | shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 | ||
推测执行 | 推测执行相关的参数,一般不需要特别关注 | spark.speculation | true | spark推测执行的开关,作用同hive的推测执行。 (注意:如果task中有向外部存储写入数据,开启推测执行则可能向外存写入重复的数据,要根据情况选择是否开启) |
spark.speculation.interval | 1000ms | 开启推测执行后,每隔多久通过checkspeculatabletasks方法检测是否有需要推测式执行的tasks | ||
spark.speculation.quantile | 0.8 | 当成功的task数超过总task数的spark.speculation.quantile时(社区版默认75%,公司默认80%),再统计所有成功的tasks的运行时间,得到一个中位数,用这个中位数乘以spark.speculation.multiplier(社区版默认1.5,公司默认3)得到运行时间门限,如果在运行的tasks的运行时间超过这个门限,则对它启用推测。 如果资源充足,可以适当减小spark.speculation.quantile和spark.speculation.multiplier的值 | ||
spark.speculation.multiplier | 3 | 解释见上面spark.speculation.quantile | ||
谓词下推 | 如果出现谓词没有下推,可以考虑修改这些参数 | spark.sql.parquet.filterpushdown |
| parquet格式下的谓词下推开关 |
spark.sql.orc.filterpushdown |
| orc格式下的谓词下推开关(此处有疑惑,尝试关闭后发现还是下推了) | ||
spark.sql.hive.metastorepartitionpruning | true | 当为true,spark sql的谓语将被下推到hive metastore中,更早的消除不匹配的分区(此处有疑惑,尝试关闭后发现还是下推了) |
发表评论