Spark 1.6 内存管理模型( Unified Memory Management)分析
前言
Memory Manager
spark.memory.useLegacyMode=false
决定的。如果采用1.6之前的模型,这会使用StaticMemoryManager来管理,否则使用新的。
UnifiedMemoryManager
- ExecutionMemory。这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2) 配置。
- StorageMemory。这片内存区域是为了解决 block cache(就是你显示调用dd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)。设置
- OtherMemory。给系统预留的,因为程序本身运行也是需要内存的。 (默认为0.2).
OtherMemory
acquireExecutionMemory
acquireStorageMemory
acquireExecutionMemory
- 如果ExecutionMemory 内存充足,则不会触发向Storage申请内存
- 每个Task能够被使用的内存被限制在 poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之间。
maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)
poolSize = ExecutionMemoryPool.poolSize (当前ExecutionMemoryPool 所持有的内存)
- 如果ExecutionMemory 的内存不足,则会触发向StorageMemory索引要内存的操作。
- 如果ExecutionMemory 的内存不足,则会向 StorageMemory要内存,具体怎么样呢? 看下面一句代码就懂了:
val memoryReclaimableFromStorage = math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded,memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
acquireStorageMemory
val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
MemoryPool
@GuardedBy("this")
protected val storageMemoryPool = new StorageMemoryPool(this)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
- 内存的借出借入
- task目前内存的使用跟踪
总结
- 理论上可以减少Shuffle spill数,极端情况可能中间就没有spill过程了,可以大大减少IO次数
- 如果你的内存太紧张,可能无法缓解问题
- 如果你的程序具有偏向性,比如重度ExectionMemory 或者StorageMemory 的某一个,则可能会带来比较好的效果