配置项分为 3 类:
读取配置项顺序 :SparkConf 对象 -> 命令行参数 -> 配置文件
资源类别 | 配置项 | 含义 |
---|---|---|
CPU | spark.cores.max | 集群满 CPU 核 |
spark.executor.cores | 每个 Executors 可用的 CPU Cores | |
spark.default.parallelism | 默认并行度 | |
spark.sql.shuffle.partitions | Reduce 的默认并行度 | |
spark.task.cpus | 每个任务可用的 CPU 核 | |
spark.executor.instances | 集群内 Executors 的个数 | |
内存 | spark.executor.memory | 单个 Executor 的堆内内存总大小 |
spark.memory.offHeap.enabled | 是否启动堆外内存 | |
spark.memory.offHeap.size | 单个 Executorp 的堆外内存总大小 | |
spark.memory.fraction | 除 User Memory 外的内存空间占比 | |
spark.memory.storageFraction | 缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction | |
spark.rdd.compress | RDD缓存是否压缩,默认不压缩 | |
磁盘 | spark.local.dir | 存储 Shuffle 中间文件/RDD Cache 的磁盘目录 |
配置项:
spark.cores.max | 集群满 CPU 核 |
---|---|
spark.executor.cores | 每个 Executors 可用的 CPU 核 |
spark.task.cpus | 每个任务可用的 CPU 核 |
spark.executor.instances | 集群内 Executors 的个数 |
并行度 : 定义分布式数据集划分的份数/粒度,决定了分布式任务的计算负载。并行度越高,数据的粒度越细,数据分片越多,数据越分散
并行度的配置项 :
spark.default.parallelism | 默认并行度 |
---|---|
spark.sql.shuffle.partitions | Reduce 的默认并行度 |
并行计算任务:在任一时刻整个集群能够同时计算的任务数量
spark.executor.instances
* spark.executor.cores
达到 CPU、内存、数据之间的平衡的约定 :
spark.executor.cores
指定 CPU Cores ,记为 cExecution Memory
内存大小 ,记为 m公式量化 :
# D/P = 数据分片大小,m/c = 每个 Task 分到的可用内存
D/P ~ m/c
内存配置项 :
spark.executor.memory | 单个 Executor 的堆内内存总大小 |
---|---|
spark.memory.offHeap.size | 单个 Executorp 的堆外内存总大小 |
(spark.memory.offHeap.enabled=true ) | |
spark.memory.fraction | 堆内内存中,用于缓存RDD和执行计算的内存比例 |
spark.memory.storageFraction | 缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction |
spark.rdd.compress | RDD缓存是否压缩,默认不压缩 |
堆外存储:
处理数据集:
User Memory :存储开发者自定义的数据结构,这些数据结构需要协助分布式数据集的处理
spark.memory.fraction
: 明确 Spark 可支配内存占比,即 :User Memory 堆内占比 = 1 - spark.memory.fraction
spark.memory.fraction
:系数越大,Spark 可支配的内存越多,User Memory 占比越小spark.memory.fraction
默认值是 0.6,JVM 堆内的 60% 给 Spark支配,40% 给 User Memory调整内存相对占比:
spark.memory.fraction
调低,用于分布式计算和缓存分布式数据集spark.memory.fraction
调高,用于分布式计算和缓存分布式数据集sf 的设置情况:
JVM 把 Heap 堆内内存分为:
Full GC时,会引发 STW:
为了 RDD cache 访问效率,用 RDD/DataFrame/Dataset.cache
,以对象值形式缓存到内存 (避免序列化消耗)
spark.rdd.compress
:RDD 缓存默认不压缩
磁盘的配置项:
有条件可以设置个大而性能好的文件系统,如:空间足够大的 SSD 文件系统目录
spark.shuffle.file.buffer | Map 输出端的写缓冲区的大小 |
---|---|
spark.reducer.maxSizeInFlight | Reduce 输入端的读缓冲区的大小 |
spark.shuffle.sort.bypassMergeThreshold | Map 阶段不进行排序的分区阈值 |
Shuffle 的计算的两个阶段:
自 Spark 1.6 后,全用 Sort shuffle manager 管理 Shuffle
repartition、groupBy 就没有排序的需求,引入的排序就是额外的计算开销
spark.shuffle.sort.bypassMergeThreshold
改变 Reduce 端的并行度(默认值 200)。当 Reduce 的分区数 < 该值时,Shuffle 就不会引入排序作用 | 配置项 | 含义 |
---|---|---|
AQE | spark.sql.adaptive.enabled | 是否启用 AQE |
Join 策略 | spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin | 非空分区比例 < 该值,调整Join策略 |
spark.sql.autoBroadcastJoinThreshold | 基表 < 该值, 触发Broadcast Join | |
自动分区合并 | spark.sql.adaptive.coalescePartitions.enabled | 是否启用合并分区 |
spark.sql.adaptive.advisoryPartitionSizelnBytes | 合并后的目标分区大小 | |
spark.sql.adaptive.coalescePartitions.minPartitionNum | 分区合并后,并行度 > 该值 | |
自动倾斜处理 | spark.sql.adaptive.skewJoin.enabled | 是否自动处理数据倾斜 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 倾斜分区的比例系数 | |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdlnBytes | 倾斜分区的最低阀值 | |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 拆分倾斜分区粒度 (字节) |
Spark 3.0 推出 AQE (Adaptive Query Execution, 自适应查询执行) 的 3 个动态优化特性: Join 策略调整、自动分区合并、自动倾斜处理
# 启用 AQE
spark.sql.adaptive.enabled true
Join 策略调整 : Spark SQL 在运行时动态调整为 Broadcast Join
autoBroadcastJoinThreshold
时,下个阶段就可能变为 Broadcast Join动态 Join 策略的条件二 :大表过滤后,非空分区比例 < nonEmptyPartitionRatioForBroadcastJoin
,才能成功触发 Broadcast Join 降级
配置项:
# AQE前,基表 < 该值,就会触发 Broadcast Join
spark.sql.autoBroadcastJoinThreshold 10m# AQE后,非空分区比例 < 该值,就调整动态 Join 策略
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.5
Spark SQL 的广播阈值对比的两种情况:
DataFrame 执行计划中的统计值 :
val df: DataFrame = _
// 先对分布式数据集加Cache
df.cache.count// 获取执行计划
val plan = df.queryExecution.logical// 获取执行计划对于数据集大小的精确预估
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes
自动分区合并 :解决 Reduce 过小的分区,而导致的数据的不均衡问题
分区合并示意图 :
# 是否启用自动分区合并,默认启用
spark.sql.adaptive.coalescePartitions.enabled true# 合并后的目标分区大小
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB# 分区合并后,并行度 > 该值
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
例子 :Shuffle 中间文件 = 20GB,minPartitionNum = 200,
自动倾斜处理:把倾斜的数据分区拆分成小分区
中位数 * skewedPartitionFactor
,得到判定阈值。凡是 > 阈值的数据分区,就可能认为倾斜分区skewedPartitionThresholdInBytes
,就会判定为倾斜分区配置项 :
# 开启自动倾斜处理
spark.sql.adaptive.skewJoin.enabled true# 判断大分区,倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5# 判断大分区,倾斜分区的最低阔值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdinBytes 256MB# 拆分大分区 , 倾斜分区的拆分单位
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB
例子:数据表有 3 个分区:90MB、100MB 、512MB。中位数是 100MB