2.1.8、Memory Model
2.1.8.1、Tungsten Origin background
众所周知,Spark是由Scala+Java开发的一种基于内存计算的大数据解决方案,底层运行在JVM上,那么自然而然的会有GC的问题反过来限制Spark的性能,而且绝大多数Spark应用程序的主要瓶颈不在于IO/网络,而是在于CPU和内存。此时Project Tungsten由DataBricks提出,并在Spark1.5中引入,在1.6.X对内存进行了优化,在2.X对CPU进行了优化,也就是说该项目主要是针对于CPU和Memory进行优化,具体优化集中在以下三个方面:
Memory Management And Binary Processing(内存管理和二进制处理)
Cacahe-aware Computation(缓存感知计算):使用了友好的数据结构和算法来完成数据的存储和复用,提升缓存命中率
Code Generation(代码生成):扩展了更多的表达式求值和SQL操作,把已有的代码变成本地的字节,不需要很多的抽象和匹配等,避免了昂贵的虚拟函数调用
2.1.8.2、Tungsten Optimize-Memory
本篇主要讲述tungsten在内存这块的优化点,以及spark是如何进行内存分配的(On-Heap和Off-Heap结合,Storage/Executor/Other划分),通过何种方式的寻址(通过引入了Page table来管理On-Heap和Off-Heap)来实现的统一内存管理。
2.1.8.2.0 堆划分
这里先复习一下Spark运行的整体流程:
1、通过spark-submit命令提交Spark作业,启动Driver(根据不同的模式如yarn-client,yarn-cluster,启动点不同),生成SparkContext对象(这里会进行DAG-->Stage-->Task划分)
2、SparkContext和Cluster Manager进行通信,申请资源以及后续的任务分配和监控,并在指定Worker节点上启动Executor
3、SparkContext在实例化的时候,会构建DAG,并分解为多个Stage,并把每个Stage中的TaskSet发送给TaskScheduler
4、Executor向Driver申请Task,然后Driver将应用程序以及相关依赖包发送到Executor端,并在Executor端执行task
5、Executor将task运行结果反馈给TaskScheduler,然后再反馈给DAGScheduler
当整个作业结束后,SparkContext会向ClusterManager注销并释放所有资源
从运行的整体流程上看,Driver端的工作主要是负责创建SparkContext,提交作业以及协调任务;Executor端的工作主要就是执行task。
从内存使用的角度上看,Executor端的内存设计相当比较复杂些,所以本文也将基于Executor的内存进行概述(本文中讲到Spark内存指的也是Executor端的内存)。
那么接下来再针对Executor端的内存设计进行拆解,见下图:
Worker节点启动的Executor其实也是一个JVM进程,因此Executor的内存管理是建立在JVM内存管理上的(On-Heap堆内内存),同时Spark也引入了Off-Heap(堆外内存),使之可以直接在系统内存中开辟空间,避免了在数据处理过程中不必要的序列化和反序列化的开销,同时也降低了GC的开销。
2.1.8.2.1 On-Heap(堆内)
Spark对堆内内存的管理其实只是一种逻辑上的管理,内存的申请和释放都是由JVM来完成的,而Spark只是在申请后和释放前记录这些内存。
申请内存后:如创建一个对象,JVM从堆内内存中分配空间,并返回对象的引用,而Spark会保存该对象的引用,记录该对象占用的内存
释放内存前:spark删除该对象的引用,记录该对象释放的内存,等待JVM来真正释放掉该对象占用的内存
这里需要说明一下spark关于序列化的一个小知识点:
经过序列化的对象,是以字节流的形式存在,占用的内存大小是可以直接计算,而对于非序列化的对象,占用的内存只能通过周期性地采样近似估算得到的,也就是说每次新增的数据项都会计算一次占用的内存大小,这种方式会有一定的误差,可能会导致某一时刻的实际内存超过预期。当被Spark标记为释放的对象实例时,也有可能没有被JVM回收,导致实际可用的内存小于spark记录的可用内存,造成OOM的发生。
为了减少OOM异常的发生,Spark对堆内内存再次进行了划分(即分为Storage,Executor,Other,下一小节将进行详解),通过内存划分方式各自规划管理来提升内存的利用率。
2.1.8.2.2 Off-Heap(堆外)
为了解决基于JVM托管方式存在的缺陷,Tungsten引入了基于Off-Heap管理内存的方式,通过sun.misc.Unsafe管理内存,这样可以使得Spark的operation直接使用分配的二进制数据,而不是JVM对象,降低了GC带来的开销。而且对于序列化的数据占用的空间可以被精准计算,相对堆内内存来说降低了管理难度。当然默认情况堆外内存是没有启用的,需要通过配置参数spark.memory.offHeap.enabled来启用.
2.1.8.3、Memory division
根据内存使用目的不同,对堆内外内存进行了如上图的划分:
针对堆内内存来说,划分了4块:
存储内存(Storage Memory):该部分的内存主要是用于缓存或者内部数据传输过程中使用的,比如缓存RDD或者广播数据
执行内存(Execution Memory):该部分的内存主要用于计算的内存,包括shuffles,Joins,sorts以及aggregations
其他内存(Other Memory):该部分的内存主要给存储用户定义的数据结构或者spark内部的元数据
预留内存:和other内存作用是一样的,但由于spark对堆内内存采用估算的方式,所以提供了预留内存来保障有足够的空间
针对堆外内存来说,划分了2块(前面也提到过了spark对堆外内存的使用可以精准计算):
1、存储内存(Storage Memory)
2、执行内存(Execution Memory)
2.1.8.4、Memory management
Tungsten使用了Off-Heap使得spark实现了自己独立的内存管理,避免了GC引发的性能问题,省去了序列化和反序列化的过程。Spark1.6版本之前使用了静态内存管理模式,而在此之后使用统一内存管理模型,并,可以直接操作内存中的二进制数据,而不是Java对象,很大程度上摆脱了JVM内存管理的限制。
2.1.8.4.1 Spark1.6之前-静态内存管理模型
下面的两张图相信有些读者已经很熟悉了。
静态内存模型最大的特点就是:堆内内存中的每个区域的大小在spark应用程序运行期间是固定的,用户可以在启动前进行配置;这也需要用户对spark的内存模型非常熟悉,否则会因为配置不当造成严重后果
对于堆内内存区域的划分以及比例如下图:
1、存储内存(Storage Memory): 默认情况下,存储内存占用整个堆内存的60%(该占比由spark.storage.memoryFraction来控制),主要用了存储缓存的RDD或者广播数据;
2、执行内存(Execution Memory): 默认情况下,执行内存占用整个堆内存的20%(该占比由spark.shuffle.memoryFraction来控制),主要用来存储进行shuffle计算的内容
3、预留内存: 默认情况下,预留内存占用整个堆内存的20%(该占比取决于上面两个内存区域的大小),主要用来存储一些元数据或者用户定义的数据结构
这里需要说一下Unroll过程:RDD以Block形式被缓存到存储内存,Record在堆内或堆外存储内存中占用一块连续的空间。把Partition由不连续的存储空间转换为连续空间的过程就是Unroll,也称之为展开操作
对于堆外内存的划分,比较简单,即只有存储内存和执行内存(具体原因上文也已经讲到了,即spark对堆外内存的使用计算是比较精确的,所以不需要额外的预留空间来避免OOM的发生)
源码实现
基类MemoryManager封装了静态内存管理模型和统一内存管理模型,而StaticMemoryManager类负责实现静态内存模型,UnifiedMemoryManager类实现统一内存模型。具体采用哪种内存分配由tungstenMemoryMode来决定,即由MemoryAllocator来负责具体分配(分别实现了两个子类),其中allocate和free函数来提供内存的分配和释放,分配的内存以MemoryBlock来表示。
静态内存模型管理器-StaticMemoryManager类
//Unrol1过程中可用的内存,占最大storage内存的8.2
private val maxUnrollMemory: Long = {(max0nHeapstorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}//获取最大的storage内存
private def getMaxStorageMemory(conf: SparkConf): Long = {val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)//Storage内存占全部内存占比val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) //Storage内存的安全系数val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) (SystemMaxMemory memoryFraction safetyFraction).toLong
}//获取最大的Execution内存
private def getMaxExecutionMemory(conf: SparkConf): Long = {val systemMaxMemory = conf.getlong("spark.testing.memory", Runtime.getRuntime.maxMemory)if (conf.contains("spark.executor.memory”)) {val executorMemory = conf.getSizeAsBytes("spark.executor.memory")}//Execution内存占全部内存占比val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) //Execution内存的安全系数val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) (SystemMaxMemory memoryFraction safetyFraction).toLong
}
2.1.8.4.2、Spark1.6之后-统一内存管理模型
关于新的统一内存管理模型,有兴趣的读者可以参考https://issues.apache.org/jira/secure/attachment/12765646/unified-memory-management-spark-l0000.pdf
统一内存模型和静态内存模型的区别在于:存储内存和执行内存共享同一块空间(占堆内内存的60%),且可以动态占用对方的空闲区域
统一内存模型关于堆内内存区域的划分,这里有以下几点需要注意:
1、执行内存和存储内存共享同一空间,该空间占用可用内存的60%,例如我们设置1G内存,那么Execution和Storage共用内存就是(1024-300)*0.6 = 434MB
2、执行内存和存储内存可以互相占用对方空闲空间
3、存储内存可以借用执行内存,直到执行内存重新占用它的空间为止。当发生这种情况的时候,缓存块将从内存中清除,直到足够的借入内存被释放,满足执行内存、请求内存的需要
4、执行内存可以借用尽可能多空闲的存储内存,但是执行内存由于执行操作所涉及的复杂性,执行内存永远不会被存储区逐出,也就是说如果执行内存已经占用存储内存的大部分空间,那么缓存块就会有可能失败,在这种情况下,根据存储级别的设置,新的块会被立即逐出内存
5、虽然存储内存和执行内存共享同一空间,但是会存在一个初始边界值,具体可见UnifiedMemoryManager.apply方法
统一内存模型对于堆外内存的设计和静态内存模型是一样的,这里不再重复介绍了
//源码实现-UnifiedMemoryManager
private def getMaxMemory(conf: SparkConf): Long = val systemMemory = confgetLong("spark.testing.memory" Runtime.getRuntime.maxMemory)//系统预留的内存大小,默认为300MBval reservedMemory = conf.getLong("spark.testing.reservedMemory"if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)//当前最小的内存需要308*1.5,即45MB,不满足条件就会退出val minSystemMemory = (reservedMemory * 1.5).ceil.toLongif (systemMemory < minSystemMemory) {throw new IllegalArgumentException(s"System memory $systemMemory must " +s"be at least $minsystemMemory. Please increase heap size using the " + s"--driver-memory option or spark.driver.memory in Spark configuration.")}// SPARK-12759 Check executor memory to fail fast if memory is insufficientif (conf.contains("spark.executor .memory")) val executorMemory = conf.getsizeAsBytes("spark.executor.memory")if (executorMemory < minSystemMemory) throw new IllegalArgumentException(s"Executor memory minSystemMemory, " + s"please increase executor memory using the ..."}val usableMemory = systemMemory - reservedMemory//当前Execution和storage共享的最大内存占比默认为@.6val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)(usableMemory * memoryFraction).toLong
}def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {val maxMemory = getMaxMemory(conf)new UnifiedMemoryManager(conf,maxHeapMemory = maxMemory,//通过配器参数spark.memory.storageFraction,//设置Execution和storage共享内存初始边界,即默认各占总内存一半onHeapStorageRegionsize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,numCores = numCores)
}