以下是整理的Spark中的一些配置参数,官方文档请参考 Spark Configuration 。
Spark提供三个位置用来配置系统:
Spark属性:控制大部分的应用程序参数,可以用SparkConf对象或者Java系统属性设置
环境变量:可以通过每个节点的
conf/spark-env.sh
脚本设置。例如IP地址、端口等信息
日志配置:可以通过log4j.properties配置
Spark属性控制大部分的应用程序设置,并且为每个应用程序分别配置它。这些属性可以直接在
SparkConf
上配置,然后传递给
SparkContext
。
SparkConf
允许你配置一些通用的属性(如master URL、应用程序名称等等)以及通过
set()
方法设置的任意键值对。例如,我们可以用如下方式创建一个拥有两个线程的应用程序。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
在一些情况下,你可能想在
SparkConf
中避免硬编码确定的配置。例如,你想用不同的master或者不同的内存数运行相同的应用程序。Spark允许你简单地创建一个空conf。
val sc = new SparkContext(new SparkConf())
然后你在运行时设置变量:
./bin/spark-submit --name "My app" --master local[4] --conf spark.shuffle.spill=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell和
spark-submit
工具支持两种方式动态加载配置。第一种方式是命令行选项,例如
--master
,如上面shell显示的那样。
spark-submit
可以接受任何Spark属性,用
--conf
参数表示。但是那些参与Spark应用程序启动的属性要用特定的参数表示。运行
./bin/spark-submit --help
将会显示选项的整个列表。
bin/spark-submit
也会从
conf/spark-defaults.conf
中读取配置选项,这个配置文件中,每一行都包含一对以
空格
或者
等号
分开的键和值。例如:
spark.master spark://5.6.7.8:7077
spark.executor.memory 512m
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
任何标签指定的值或者在配置文件中的值将会传递给应用程序,并且通过
SparkConf
合并这些值。在
SparkConf
上设置的属性具有最高的优先级,其次是传递给
spark-submit
或者
spark-shell
的属性值,最后是
spark-defaults.conf
文件中的属性值。
优先级顺序:
SparkConf > CLI > spark-defaults.conf
在
http://<driver>:4040
上的应用程序Web UI在
Environment
标签中列出了所有的Spark属性。这对你确保设置的属性的正确性是很有用的。
注意:
只有通过spark-defaults.conf, SparkConf以及命令行直接指定的值才会显示
。对于其它的配置属性,你可以认为程序用到了默认的值。
可用的属性
控制内部设置的大部分属性都有合理的默认值,一些最通用的选项设置如下:
属性名称 | 默认值 | 含义 |
---|---|---|
spark.app.name |
(none) |
你的应用程序的名字。这将在UI和日志数据中出现 |
spark.driver.cores |
1 |
driver程序运行需要的cpu内核数 |
spark.driver.maxResultSize |
1g |
每个Spark action(如collect)所有分区的序列化结果的总大小限制。设置的值应该不小于1m,0代表没有限制。如果总大小超过这个限制,程序将会终止。大的限制值可能导致driver出现内存溢出错误(依赖于
|
spark.driver.memory |
512m |
driver进程使用的内存数 |
spark.executor.memory |
512m |
每个executor进程使用的内存数。和JVM内存串拥有相同的格式(如512m,2g) |
spark.extraListeners |
(none) |
注册监听器,需要实现SparkListener |
spark.local.dir |
/tmp |
Spark中暂存空间的使用目录。在Spark1.0以及更高的版本中,这个属性被SPARK_LOCAL_DIRS(Standalone, Mesos)和LOCAL_DIRS(YARN)环境变量覆盖。 |
spark.logConf |
false |
当SparkContext启动时,将有效的SparkConf记录为INFO。 |
spark.master |
(none) |
集群管理器连接的地方 |
属性名称 | 默认值 | 含义 |
---|---|---|
spark.driver.extraClassPath |
(none) |
附加到driver的classpath的额外的classpath实体。 |
spark.driver.extraJavaOptions |
(none) |
传递给driver的JVM选项字符串。例如GC设置或者其它日志设置。注意,
|
spark.driver.extraLibraryPath |
(none) |
指定启动driver的JVM时用到的库路径 |
spark.driver.userClassPathFirst |
false |
(实验性)当在driver中加载类时,是否用户添加的jar比Spark自己的jar优先级高。这个属性可以降低Spark依赖和用户依赖的冲突。它现在还是一个实验性的特征。 |
spark.executor.extraClassPath |
(none) |
附加到executors的classpath的额外的classpath实体。这个设置存在的主要目的是Spark与旧版本的向后兼容问题。用户一般不用设置这个选项 |
spark.executor.extraJavaOptions |
(none) |
传递给executors的JVM选项字符串。例如GC设置或者其它日志设置。注意,
|
spark.executor.extraLibraryPath |
(none) |
指定启动executor的JVM时用到的库路径 |
spark.executor.logs.rolling.maxRetainedFiles |
(none) |
设置被系统保留的最近滚动日志文件的数量。更老的日志文件将被删除。默认没有开启。 |
spark.executor.logs.rolling.size.maxBytes |
(none) |
executor日志的最大滚动大小。默认情况下没有开启。值设置为字节 |
spark.executor.logs.rolling.strategy |
(none) |
设置executor日志的滚动(rolling)策略。默认情况下没有开启。可以配置为
|
spark.executor.logs.rolling.time.interval |
daily |
executor日志滚动的时间间隔。默认情况下没有开启。合法的值是
|
spark.files.userClassPathFirst |
false |
(实验性)当在Executors中加载类时,是否用户添加的jar比Spark自己的jar优先级高。这个属性可以降低Spark依赖和用户依赖的冲突。它现在还是一个实验性的特征。 |
spark.python.worker.memory |
512m |
在聚合期间,每个python worker进程使用的内存数。在聚合期间,如果内存超过了这个限制,它将会将数据塞进磁盘中 |
spark.python.profile |
false |
在Python worker中开启profiling。通过
|
spark.python.profile.dump |
(none) |
driver退出前保存分析结果的dump文件的目录。每个RDD都会分别dump一个文件。可以通过
|
spark.python.worker.reuse |
true |
是否重用python worker。如果是,它将使用固定数量的Python workers,而不需要为每个任务
|
spark.executorEnv.[EnvironmentVariableName] |
(none) |
通过
|
spark.mesos.executor.home |
driver side SPARK_HOME |
设置安装在Mesos的executor上的Spark的目录。默认情况下,executors将使用driver的Spark本地(home)目录,这个目录对它们不可见。注意,如果没有通过
|
spark.mesos.executor.memoryOverhead |
executor memory * 0.07, 最小384m |
这个值是
|
属性名称 | 默认值 | 含义 |
---|---|---|
spark.reducer.maxMbInFlight |
48 |
从递归任务中同时获取的map输出数据的最大大小(mb)。因为每一个输出都需要我们创建一个缓存用来接收,这个设置代表每个任务固定的内存上限,所以除非你有更大的内存,将其设置小一点 |
spark.shuffle.blockTransferService |
netty |
实现用来在executor直接传递shuffle和缓存块。有两种可用的实现:
|
spark.shuffle.compress |
true |
是否压缩map操作的输出文件。一般情况下,这是一个好的选择。 |
spark.shuffle.consolidateFiles |
false |
如果设置为”true”,在shuffle期间,合并的中间文件将会被创建。创建更少的文件可以提供文件系统的shuffle的效 率。这些shuffle都伴随着大量递归任务。当用ext4和dfs文件系统时,推荐设置为”true”。在ext3中,因为文件系统的限制,这个选项可 能机器(大于8核)降低效率 |
spark.shuffle.file.buffer.kb |
32 |
每个shuffle文件输出流内存内缓存的大小,单位是kb。这个缓存减少了创建只中间shuffle文件中磁盘搜索和系统访问的数量 |
spark.shuffle.io.maxRetries |
3 |
Netty only,自动重试次数 |
spark.shuffle.io.numConnectionsPerPeer |
1 |
Netty only |
spark.shuffle.io.preferDirectBufs |
true |
Netty only |
spark.shuffle.io.retryWait |
5 |
Netty only |
spark.shuffle.manager |
sort |
它的实现用于shuffle数据。有两种可用的实现:
|
spark.shuffle.memoryFraction |
0.2 |
如果
|
spark.shuffle.sort.bypassMergeThreshold |
200 |
(Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions |
spark.shuffle.spill |
true |
如果设置为”true”,通过将多出的数据写入磁盘来限制内存数。通过
|
spark.shuffle.spill.compress |
true |
在shuffle时,是否将spilling的数据压缩。压缩算法通过
|
属性名称 | 默认值 | 含义 |
---|---|---|
spark.eventLog.compress |
false |
是否压缩事件日志。需要
|
spark.eventLog.dir |
file:///tmp/spark-events |
Spark事件日志记录的基本目录。在这个基本目录下,Spark为每个应用程序创建一个子目录。各个应用程序记录日志到直到的目录。用户可能想设置这为统一的地点,像HDFS一样,所以历史文件可以通过历史服务器读取 |
spark.eventLog.enabled |
false |
是否记录Spark的事件日志。这在应用程序完成后,重新构造web UI是有用的 |
spark.ui.killEnabled |
true |
运行在web UI中杀死stage和相应的job |
spark.ui.port |
4040 |
你的应用程序dashboard的端口。显示内存和工作量数据 |
spark.ui.retainedJobs |
1000 |
在垃圾回收之前,Spark UI和状态API记住的job数 |
spark.ui.retainedStages |
1000 |
在垃圾回收之前,Spark UI和状态API记住的stage数 |
属性名称 | 默认值 | 含义 |
---|---|---|
spark.broadcast.compress |
true |
在发送广播变量之前是否压缩它 |
spark.closure.serializer |
org.apache.spark.serializer.JavaSerializer |
闭包用到的序列化类。目前只支持java序列化器 |
spark.io.compression.codec |
snappy |
压缩诸如RDD分区、广播变量、shuffle输出等内部数据的编码解码器。默认情况下,Spark提供了三种选择:lz4、lzf和snappy,你也可以用完整的类名来制定。 |
spark.io.compression.lz4.block.size |
32768 |
LZ4压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率 |
spark.io.compression.snappy.block.size |
32768 |
Snappy压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率 |
spark.kryo.classesToRegister |
(none) |
如果你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类 |
spark.kryo.referenceTracking |
true |
当用Kryo序列化时,跟踪是否引用同一对象。如果你的对象图有环,这是必须的设置。如果他们包含相同对象的多个副本,这个设置对效率是有用的。如果你知道不在这两个场景,那么可以禁用它以提高效率 |
spark.kryo.registrationRequired |
false |
是否需要注册为Kyro可用。如果设置为true,然后如果一个没有注册的类序列化,Kyro会抛出异常。如果设置为false,Kryo将会同时写每个对象和其非注册类名。写类名可能造成显著地性能瓶颈。 |
spark.kryo.registrator |
(none) |
如果你用Kryo序列化,设置这个类去注册你的自定义类。如果你需要用自定义的方式注册你的类,那么这个属性是有用的。否则
|
spark.kryoserializer.buffer.max.mb |
64 |
Kryo序列化缓存允许的最大值。这个值必须大于你尝试序列化的对象 |
spark.kryoserializer.buffer.mb |
0.064 |
Kyro序列化缓存的大小。这样worker上的每个核都有一个缓存。如果有需要,缓存会涨到
|
spark.rdd.compress |
true |
是否压缩序列化的RDD分区。在花费一些额外的CPU时间的同时节省大量的空间 |
spark.serializer |
org.apache.spark.serializer.JavaSerializer |
序列化对象使用的类。默认的Java序列化类可以序列化任何可序列化的java对象但是它很慢。所有我们建议用 org.apache.spark.serializer.KryoSerializer |
spark.serializer.objectStreamReset |
100 |
当用
|
属性名称 | 默认值 | 含义 |
---|---|---|
spark.broadcast.blockSize |
4096 |
TorrentBroadcastFactory传输的块大小,太大值会降低并发,太小的值会出现性能瓶颈 |
spark.broadcast.factory |
org.apache.spark.broadcast.TorrentBroadcastFactory |
broadcast实现类 |
spark.cleaner.ttl |
(infinite) |
spark记录任何元数据(stages生成、task生成等)的持续时间。定期清理可以确保将超期的元数据丢弃,这在运行长时间任务是很有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理 |
spark.default.parallelism |
本地模式:机器核数;Mesos:8;其他:
|
如果用户不设置,系统使用集群中运行shuffle操作的默认任务数(groupByKey、 reduceByKey等) |
spark.executor.heartbeatInterval |
10000 |
executor 向 the driver 汇报心跳的时间间隔,单位毫秒 |
spark.files.fetchTimeout |
60 |
driver 程序获取通过
|
spark.files.useFetchCache |
true |
获取文件时是否使用本地缓存 |
spark.files.overwrite |
false |
调用
|
spark.hadoop.cloneConf |
false |
每个task是否克隆一份hadoop的配置文件 |
spark.hadoop.validateOutputSpecs |
true |
是否校验输出 |
spark.storage.memoryFraction |
0.6 |
Spark内存缓存的堆大小占用总内存比例,该值不能大于老年代内存大小,默认值为0.6,但是,如果你手动设置老年代大小,你可以增加该值 |
spark.storage.memoryMapThreshold |
2097152 |
内存块大小 |
spark.storage.unrollFraction |
0.2 |
Fraction of spark.storage.memoryFraction to use for unrolling blocks in memory. |
spark.tachyonStore.baseDir |
System.getProperty(“java.io.tmpdir”) |
Tachyon File System临时目录 |
spark.tachyonStore.url |
tachyon://localhost:19998 |
Tachyon File System URL |
属性名称 | 默认值 | 含义 |
---|---|---|
spark.driver.host |
(local hostname) |
driver监听的主机名或者IP地址。这用于和executors以及独立的master通信 |
spark.driver.port |
(random) |
driver监听的接口。这用于和executors以及独立的master通信 |
spark.fileserver.port |
(random) |
driver的文件服务器监听的端口 |
spark.broadcast.port |
(random) |
driver的HTTP广播服务器监听的端口 |
spark.replClassServer.port |
(random) |
driver的HTTP类服务器监听的端口 |
spark.blockManager.port |
(random) |
块管理器监听的端口。这些同时存在于driver和executors |
spark.executor.port |
(random) |
executor监听的端口。用于与driver通信 |
spark.port.maxRetries |
16 |
当绑定到一个端口,在放弃前重试的最大次数 |
spark.akka.frameSize |
10 |
在”control plane”通信中允许的最大消息大小。如果你的任务需要发送大的结果到driver中,调大这个值 |
spark.akka.threads |
4 |
通信的actor线程数。当driver有很多CPU核时,调大它是有用的 |
spark.akka.timeout |
100 |
Spark节点之间的通信超时。单位是秒 |
spark.akka.heartbeat.pauses |
6000 |
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of
|
spark.akka.failure-detector.threshold |
300.0 |
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka’s
|
spark.akka.heartbeat.interval |
1000 |
This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka’s failure detector. Tune this in combination of
|
属性名称 | 默认值 | 含义 |
---|---|---|
spark.task.cpus |
1 |
为每个任务分配的内核数 |
spark.task.maxFailures |
4 |
Task的最大重试次数 |
spark.scheduler.mode |
FIFO |
Spark的任务调度模式,还有一种Fair模式 |
spark.cores.max |
当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每 台机器,而是整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使 用集群中可用的内核 |
|
spark.mesos.coarse |
False |
如果设置为true,在Mesos集群中运行时使用粗粒度共享模式 |
spark.speculation |
False |
以下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,如果设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果 |
spark.speculation.interval |
100 |
Spark多长时间进行检查task运行状态用以推测,以毫秒为单位 |
spark.speculation.quantile |
推测启动前,Stage必须要完成总Task的百分比 |
|
spark.speculation.multiplier |
1.5 |
比已完成Task的运行速度中位数慢多少倍才启用推测 |
spark.locality.wait |
3000 |
以下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,如果超出就启动下一本地优先级别 的task。该设置同样可以应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),当然,也可以通过spark.locality.wait.node等参数设置不同优先级别的本地性 |
spark.locality.wait.process |
spark.locality.wait |
本地进程级别的本地等待时间 |
spark.locality.wait.node |
spark.locality.wait |
本地节点级别的本地等待时间 |
spark.locality.wait.rack |
spark.locality.wait |
本地机架级别的本地等待时间 |
spark.scheduler.revive.interval |
1000 |
复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算 |
属性名称 | 默认值 | 含义 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
spark.dynamicAllocation.enabled |
false |
是否开启动态资源搜集 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.executorIdleTimeout |
600 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.maxExecutors |
Integer.MAX_VALUE |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.minExecutors |
0 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.schedulerBacklogTimeout |
5 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
|