Spark 配置项
创始人
2024-05-30 11:11:37
0

Spark 配置项

  • 硬件资源类
    • CPU
    • 内存
      • 堆外内
      • User Memory/Spark 可用内存
      • Execution/Storage Memory
    • 磁盘
  • Shuffle
  • Spark SQL
    • Join 策略调整
    • 自动分区合并
    • 自动倾斜处理

配置项分为 3 类:

  • 硬件资源类 : 与 CPU、内存、磁盘有关的配置项
  • Shuffle 类 : Shuffle 计算过程的配置项
  • Spark SQL : Spark SQL 优化配置项

读取配置项顺序 :SparkConf 对象 -> 命令行参数 -> 配置文件

硬件资源类

资源类别配置项含义
CPUspark.cores.max集群满 CPU 核
spark.executor.cores每个 Executors 可用的 CPU Cores
spark.default.parallelism默认并行度
spark.sql.shuffle.partitionsReduce 的默认并行度
spark.task.cpus每个任务可用的 CPU 核
spark.executor.instances集群内 Executors 的个数
内存spark.executor.memory单个 Executor 的堆内内存总大小
spark.memory.offHeap.enabled是否启动堆外内存
spark.memory.offHeap.size单个 Executorp 的堆外内存总大小
spark.memory.fraction除 User Memory 外的内存空间占比
spark.memory.storageFraction缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction
spark.rdd.compressRDD缓存是否压缩,默认不压缩
磁盘spark.local.dir存储 Shuffle 中间文件/RDD Cache 的磁盘目录

CPU

配置项:

spark.cores.max集群满 CPU 核
spark.executor.cores每个 Executors 可用的 CPU 核
spark.task.cpus每个任务可用的 CPU 核
spark.executor.instances集群内 Executors 的个数

并行度 : 定义分布式数据集划分的份数/粒度,决定了分布式任务的计算负载。并行度越高,数据的粒度越细,数据分片越多,数据越分散

并行度的配置项 :

spark.default.parallelism默认并行度
spark.sql.shuffle.partitionsReduce 的默认并行度

并行计算任务:在任一时刻整个集群能够同时计算的任务数量

  • 整个集群的并行计算任务数 = spark.executor.instances * spark.executor.cores

达到 CPU、内存、数据之间的平衡的约定 :

  • spark.executor.cores 指定 CPU Cores ,记为 c
  • Execution Memory 内存大小 ,记为 m
  • 分布式数据集的大小记为 D ,并行度记为 P,D/P = 每个数据分片大小
  • 一个数据分片对应着一个 Task(分布式任务),而一个 Task 又对应着一个 CPU Core

公式量化 :

# D/P = 数据分片大小,m/c = 每个 Task 分到的可用内存
D/P ~ m/c

内存

内存配置项 :

spark.executor.memory单个 Executor 的堆内内存总大小
spark.memory.offHeap.size单个 Executorp 的堆外内存总大小
(spark.memory.offHeap.enabled=true)
spark.memory.fraction堆内内存中,用于缓存RDD和执行计算的内存比例
spark.memory.storageFraction缓存RDD的内存占比,执行内存占比= 1 - spark.memory.storageFraction
spark.rdd.compressRDD缓存是否压缩,默认不压缩

在这里插入图片描述

  • Reserved Memory 大小固定为 300MB
  • M 指定了 Executor 进程的 JVM Heap 大小 ( Executor Memory )
  • Execution Memory 的组成: Execution Memory、Storage Memory 、UserMemory
  • User Memory : 存储用户自定义的数据结构,如 : RDD 的各类实例化对象或集合类型(如: 数组、列表等)
  • Spark 1.6 后,推出了动态内存管理模式,Execution Memory/Storage Memory 能互相抢占

堆外内

堆外存储:

  • int 的用户 ID、String 的姓名、int的年龄、Char 的性别

在这里插入图片描述

处理数据集:

  • 数据模式比较扁平,而且字段多是定长数据类型,就更多使用堆外内存
  • 数据模式很复杂,嵌套结构/变长字段很多,就更多使用 JVM 堆内内存

User Memory/Spark 可用内存

User Memory :存储开发者自定义的数据结构,这些数据结构需要协助分布式数据集的处理

spark.memory.fraction : 明确 Spark 可支配内存占比,即 :User Memory 堆内占比 = 1 - spark.memory.fraction

  • spark.memory.fraction :系数越大,Spark 可支配的内存越多,User Memory 占比越小
  • spark.memory.fraction 默认值是 0.6,JVM 堆内的 60% 给 Spark支配,40% 给 User Memory

调整内存相对占比:

  • 自定义数据结构多,spark.memory.fraction 调低,用于分布式计算和缓存分布式数据集
  • 自定义数据结构少,spark.memory.fraction 调高,用于分布式计算和缓存分布式数据集

Execution/Storage Memory

sf 的设置情况:

  • ETL :RDD Cache 使用少。就能将 sf 设低点,让 Execution Memory 大点
  • 缓存密集型 :机器学习:RDD Cache 使用较多,就能把 sf 设高点,让 Storage Memory 大点
  • 过多的缓存会引发 GC(Garbage Collection,垃圾回收)

JVM 把 Heap 堆内内存分为:

  • 年轻代:存储生命周期较短、引用次数较低的对象,会引发 Young GC
  • 老年代:存储生命周期较长、引用次数高的对象,会引发 Full GC
  • RDDcache 会存在老年代

Full GC时,会引发 STW:

  • 抢占应用程序执行线程,把所有 CPU 线程都做垃圾回收,应用程序的暂时不执行(Stop the world)
  • 等 Full GC 完事后,才把 CPU 线程释放,应用程序才能继续执行
  • Full GC 弊端远大于 Young GC

为了 RDD cache 访问效率,用 RDD/DataFrame/Dataset.cache ,以对象值形式缓存到内存 (避免序列化消耗)

  • 用对象值形式缓存数据,每条数据都要构成一个对象 (自定义Case class, Row 对象)
  • 当大量的 RDD cache 时,会引发 Full GC
  • 当应用是缓存密集型,需要大量缓存,为了执行效率,可以改用序列化

spark.rdd.compress :RDD 缓存默认不压缩

  • 启用压缩后,能节省缓存内存的占用,把更多的内存空间留给分布式任务执行
  • 启用压缩后,会引入额外的计算开销、牺牲 CPU

磁盘

磁盘的配置项:

  • spark.local.dir :任意的本地文件系统目录,默认值是 /tmp 。 用于存储各种各样的临时数据,如: Shuffle 中间文件、RDD Cache。

有条件可以设置个大而性能好的文件系统,如:空间足够大的 SSD 文件系统目录

Shuffle

spark.shuffle.file.bufferMap 输出端的写缓冲区的大小
spark.reducer.maxSizeInFlightReduce 输入端的读缓冲区的大小
spark.shuffle.sort.bypassMergeThresholdMap 阶段不进行排序的分区阈值

Shuffle 的计算的两个阶段:

  • Map 阶段:执行映射逻辑,并按 Reducer 的分区规则,将中间数据写入到本地磁盘
  • Reduce 阶段:从各个节点下载数据分片,并根据需要实现聚合计算
  • Map 阶段的计算结果(中间文件),会存储到写缓冲区(Write Buffer),满后再写入到磁盘文件系统
  • Reduce 阶段,通过网络从不同节点的磁盘中拉取中间文件,以数据块暂存到计算节点的读缓冲区(Read Buffer),满后再写入到磁盘文件系统

自 Spark 1.6 后,全用 Sort shuffle manager 管理 Shuffle

  • Sort shuffle manager 会把 Map/Reduce 都引入排序

repartition、groupBy 就没有排序的需求,引入的排序就是额外的计算开销

  • 不需要聚合/排序时,调整 spark.shuffle.sort.bypassMergeThreshold 改变 Reduce 端的并行度(默认值 200)。当 Reduce 的分区数 < 该值时,Shuffle 就不会引入排序

Spark SQL

作用配置项含义
AQEspark.sql.adaptive.enabled是否启用 AQE
Join 策略spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin非空分区比例 < 该值,调整Join策略
spark.sql.autoBroadcastJoinThreshold基表 < 该值, 触发Broadcast Join
自动分区合并spark.sql.adaptive.coalescePartitions.enabled是否启用合并分区
spark.sql.adaptive.advisoryPartitionSizelnBytes合并后的目标分区大小
spark.sql.adaptive.coalescePartitions.minPartitionNum分区合并后,并行度 > 该值
自动倾斜处理spark.sql.adaptive.skewJoin.enabled是否自动处理数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionFactor倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionThresholdlnBytes倾斜分区的最低阀值
spark.sql.adaptive.advisoryPartitionSizeInBytes拆分倾斜分区粒度 (字节)

Spark 3.0 推出 AQE (Adaptive Query Execution, 自适应查询执行) 的 3 个动态优化特性: Join 策略调整、自动分区合并、自动倾斜处理

# 启用 AQE
spark.sql.adaptive.enabled true

Join 策略调整

Join 策略调整 : Spark SQL 在运行时动态调整为 Broadcast Join

  • 每当 DAG 中的 Map 阶段执行完毕,会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 数据表的存储大小。当基表 < autoBroadcastJoinThreshold时,下个阶段就可能变为 Broadcast Join

动态 Join 策略的条件二 :大表过滤后,非空分区比例 < nonEmptyPartitionRatioForBroadcastJoin,才能成功触发 Broadcast Join 降级

  • 例子 :大表有 100 个分区,过滤后只有 15 个分区有数据
  • 非空分区比例 : 15 / 100 = 15% < 20% , 就触发 Broadcast Join 降级

配置项:

# AQE前,基表 < 该值,就会触发 Broadcast Join
spark.sql.autoBroadcastJoinThreshold 10m# AQE后,非空分区比例 < 该值,就调整动态 Join 策略
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.5

Spark SQL 的广播阈值对比的两种情况:

  • 基表来自文件系统,用基表在磁盘的存储大小与广播阈值对比
  • 基表来自 DAG 的中间文件,用 DataFrame 执行计划中的统计值与广播阈值对比

DataFrame 执行计划中的统计值 :

val df: DataFrame = _
// 先对分布式数据集加Cache
df.cache.count// 获取执行计划
val plan = df.queryExecution.logical// 获取执行计划对于数据集大小的精确预估
val estimated: BigInt = spark.sessionState.executePlan(plan).optimizedPlan.stats.sizeInBytes

自动分区合并

自动分区合并 :解决 Reduce 过小的分区,而导致的数据的不均衡问题

分区合并示意图 :

  • 依序扫描数据分区,当相邻分区的尺寸之和 > 实际大小时,就把扫描过的分区做一次合并

在这里插入图片描述

# 是否启用自动分区合并,默认启用
spark.sql.adaptive.coalescePartitions.enabled true# 合并后的目标分区大小
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB# 分区合并后,并行度 > 该值
spark.sql.adaptive.coalescePartitions.minPartitionNum 1
  • 每个分区的平均大小 = 数据集大小/最低并行度
  • 实际大小 = min(advisoryPartitionSizeInBytes , 分区的平均大小)

例子 :Shuffle 中间文件 = 20GB,minPartitionNum = 200,

  • 每个分区的尺寸= 20GB / 200 =100MB
  • 设 advisoryPartitionSizeInBytes = 200MB,最终分区 = min(100MB,200MB) = 100MB

自动倾斜处理

自动倾斜处理:把倾斜的数据分区拆分成小分区

  • 对所有数据分区按大小做排序,取中位数。将 中位数 * skewedPartitionFactor ,得到判定阈值。凡是 > 阈值的数据分区,就可能认为倾斜分区
  • 当可能倾斜分区 > skewedPartitionThresholdInBytes,就会判定为倾斜分区

配置项 :

# 开启自动倾斜处理
spark.sql.adaptive.skewJoin.enabled true# 判断大分区,倾斜分区的比例系数
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5# 判断大分区,倾斜分区的最低阔值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdinBytes 256MB# 拆分大分区 , 倾斜分区的拆分单位
spark.sql.adaptive.advisoryPartitionSizelnBytes 256MB

例子:数据表有 3 个分区:90MB、100MB 、512MB。中位数是 100MB

  • 判定阈值 = 中位数 * skewedPartitionFactor = 100MB * 5 = 500MB
  • 512MB 为候选分区
  • 512MB > skewedPartitionThresholdInBytes(256MB) ,就认为该分区是倾斜分区
  • 512MB < skewedPartitionThresholdInBytes(1GB) ,就不是倾斜分区
  • 再根据 advisoryPartitionSizeInBytes(256MB) , 对大分区进行拆分
  • 512MB 被拆成两个小分区(512MB / 2 = 256MB)

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...