'Spark內存管理之三:UnifiedMemoryManager分析'

Spark 若澤大數據 2019-08-09
"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)

maxMemory = storage + execution的最大內存

poolSize = 當前這個pool的大小

maxPoolSize = execution pool的最大內存

UnifiedMemoryManager.scala中

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)

maxMemory = storage + execution的最大內存

poolSize = 當前這個pool的大小

maxPoolSize = execution pool的最大內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道索要內存的大小:

val memoryReclaimableFromStorage=math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize -storageRegionSize)

取決於StorageMemoryPool的剩餘內存和 storageMemoryPool 從ExecutionMemory借來的內存哪個大,取最大的那個,作為可以重新歸還的最大內存

用公式表達出來就是這一個樣子:

ExecutionMemory 能借到的最大內存 = StorageMemory 借的內存 + StorageMemory 空閒內存

注意:如果實際需要的小於能夠借到的最大值,則以實際需要值為準

能回收的內存大小為:

val spaceToReclaim =storageMemoryPool.freeSpaceToShrinkPool ( math.min(extraMemoryNeeded,memoryReclaimableFromStorage))

ExecutionMemoryPool.acquireMemory()解析

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)

maxMemory = storage + execution的最大內存

poolSize = 當前這個pool的大小

maxPoolSize = execution pool的最大內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道索要內存的大小:

val memoryReclaimableFromStorage=math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize -storageRegionSize)

取決於StorageMemoryPool的剩餘內存和 storageMemoryPool 從ExecutionMemory借來的內存哪個大,取最大的那個,作為可以重新歸還的最大內存

用公式表達出來就是這一個樣子:

ExecutionMemory 能借到的最大內存 = StorageMemory 借的內存 + StorageMemory 空閒內存

注意:如果實際需要的小於能夠借到的最大值,則以實際需要值為準

能回收的內存大小為:

val spaceToReclaim =storageMemoryPool.freeSpaceToShrinkPool ( math.min(extraMemoryNeeded,memoryReclaimableFromStorage))

ExecutionMemoryPool.acquireMemory()解析

Spark內存管理之三:UnifiedMemoryManager分析

整體流程解析:

程序一直處理該task的請求,直到系統判定無法滿足該請求或者已經為該請求分配到足夠的內存為止;如果當前execution內存池剩餘內存不足以滿足此次請求時,會向storage部分請求釋放出被借走的內存以滿足此次請求

根據此刻execution內存池的總大小maxPoolSize,以及從memoryForTask中統計出的處於active狀態的task的個數計算出:

每個task能夠得到的最大內存數 maxMemoryPerTask = maxPoolSize / numActiveTasks

每個task能夠得到的最少內存數 minMemoryPerTask = poolSize /(2 * numActiveTasks)

根據申請內存的task當前使用的execution內存大小決定分配給該task多少內存,總的內存不能超過maxMemoryPerTask;但是如果execution內存池能夠分配的最大內存小於numBytes,並且如果把能夠分配的內存分配給當前task,但是該task最終得到的execution內存還是小於minMemoryPerTask時,該task進入等待狀態,等其他task申請內存時再將其喚醒,喚醒之後如果此時滿足,就會返回能夠分配的內存數,並且更新memoryForTask,將該task使用的內存調整為分配後的值

一個Task最少需要minMemoryPerTask才能開始執行

acquireStorageMemory方法

流程和acquireExecutionMemory類似,當storage的內存不足時,同樣會向execution借內存,但區別是當且僅當ExecutionMemory有空閒內存時,StorageMemory 才能借走該內存

UnifiedMemoryManager.scala中

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)

maxMemory = storage + execution的最大內存

poolSize = 當前這個pool的大小

maxPoolSize = execution pool的最大內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道索要內存的大小:

val memoryReclaimableFromStorage=math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize -storageRegionSize)

取決於StorageMemoryPool的剩餘內存和 storageMemoryPool 從ExecutionMemory借來的內存哪個大,取最大的那個,作為可以重新歸還的最大內存

用公式表達出來就是這一個樣子:

ExecutionMemory 能借到的最大內存 = StorageMemory 借的內存 + StorageMemory 空閒內存

注意:如果實際需要的小於能夠借到的最大值,則以實際需要值為準

能回收的內存大小為:

val spaceToReclaim =storageMemoryPool.freeSpaceToShrinkPool ( math.min(extraMemoryNeeded,memoryReclaimableFromStorage))

ExecutionMemoryPool.acquireMemory()解析

Spark內存管理之三:UnifiedMemoryManager分析

整體流程解析:

程序一直處理該task的請求,直到系統判定無法滿足該請求或者已經為該請求分配到足夠的內存為止;如果當前execution內存池剩餘內存不足以滿足此次請求時,會向storage部分請求釋放出被借走的內存以滿足此次請求

根據此刻execution內存池的總大小maxPoolSize,以及從memoryForTask中統計出的處於active狀態的task的個數計算出:

每個task能夠得到的最大內存數 maxMemoryPerTask = maxPoolSize / numActiveTasks

每個task能夠得到的最少內存數 minMemoryPerTask = poolSize /(2 * numActiveTasks)

根據申請內存的task當前使用的execution內存大小決定分配給該task多少內存,總的內存不能超過maxMemoryPerTask;但是如果execution內存池能夠分配的最大內存小於numBytes,並且如果把能夠分配的內存分配給當前task,但是該task最終得到的execution內存還是小於minMemoryPerTask時,該task進入等待狀態,等其他task申請內存時再將其喚醒,喚醒之後如果此時滿足,就會返回能夠分配的內存數,並且更新memoryForTask,將該task使用的內存調整為分配後的值

一個Task最少需要minMemoryPerTask才能開始執行

acquireStorageMemory方法

流程和acquireExecutionMemory類似,當storage的內存不足時,同樣會向execution借內存,但區別是當且僅當ExecutionMemory有空閒內存時,StorageMemory 才能借走該內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道能借到的內存數為:

val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree,numBytes)

所以StorageMemory從ExecutionMemory借走的內存,完全取決於當時ExecutionMemory是不是有空閒內存;借到內存後,storageMemoryPool增加借到的這部分內存,之後同上一樣,會調用StorageMemoryPool的acquireMemory()方法

StorageMemoryPool.acquireMemory

"

acquireExecutionMemory方法

UnifiedMemoryManager中的accquireExecutionMemory方法:

Spark內存管理之三:UnifiedMemoryManager分析

當前的任務嘗試從executor中獲取numBytes這麼大的內存

該方法直接向ExecutionMemoryPool索要所需內存,索要內存有以下幾個關注點:

  • 當ExecutionMemory 內存充足,則不會觸發向Storage申請內存
  • 每個Task能夠被使用的內存是被限制的
  • 索要內存的大小

我們通過源碼來進行分析

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們點進去後會發現,會調用ExecutionMemoryPool.acquireMemory()方法

ExecutionMemoryPool.scala中

Spark內存管理之三:UnifiedMemoryManager分析

我們可以發現每Task能夠被使用的內存被限制在:

poolSize / (2 * numActiveTasks) ~ maxPoolSize / numActiveTasks 之間

val maxMemoryPerTask = maxPoolSize /numActiveTasks

val minMemoryPerTask = poolSize / (2 * numActiveTasks)

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

其中maxPoolSize = maxMemory - math.min(storageMemoryUsed, storageRegionSize)

maxMemory = storage + execution的最大內存

poolSize = 當前這個pool的大小

maxPoolSize = execution pool的最大內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道索要內存的大小:

val memoryReclaimableFromStorage=math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize -storageRegionSize)

取決於StorageMemoryPool的剩餘內存和 storageMemoryPool 從ExecutionMemory借來的內存哪個大,取最大的那個,作為可以重新歸還的最大內存

用公式表達出來就是這一個樣子:

ExecutionMemory 能借到的最大內存 = StorageMemory 借的內存 + StorageMemory 空閒內存

注意:如果實際需要的小於能夠借到的最大值,則以實際需要值為準

能回收的內存大小為:

val spaceToReclaim =storageMemoryPool.freeSpaceToShrinkPool ( math.min(extraMemoryNeeded,memoryReclaimableFromStorage))

ExecutionMemoryPool.acquireMemory()解析

Spark內存管理之三:UnifiedMemoryManager分析

整體流程解析:

程序一直處理該task的請求,直到系統判定無法滿足該請求或者已經為該請求分配到足夠的內存為止;如果當前execution內存池剩餘內存不足以滿足此次請求時,會向storage部分請求釋放出被借走的內存以滿足此次請求

根據此刻execution內存池的總大小maxPoolSize,以及從memoryForTask中統計出的處於active狀態的task的個數計算出:

每個task能夠得到的最大內存數 maxMemoryPerTask = maxPoolSize / numActiveTasks

每個task能夠得到的最少內存數 minMemoryPerTask = poolSize /(2 * numActiveTasks)

根據申請內存的task當前使用的execution內存大小決定分配給該task多少內存,總的內存不能超過maxMemoryPerTask;但是如果execution內存池能夠分配的最大內存小於numBytes,並且如果把能夠分配的內存分配給當前task,但是該task最終得到的execution內存還是小於minMemoryPerTask時,該task進入等待狀態,等其他task申請內存時再將其喚醒,喚醒之後如果此時滿足,就會返回能夠分配的內存數,並且更新memoryForTask,將該task使用的內存調整為分配後的值

一個Task最少需要minMemoryPerTask才能開始執行

acquireStorageMemory方法

流程和acquireExecutionMemory類似,當storage的內存不足時,同樣會向execution借內存,但區別是當且僅當ExecutionMemory有空閒內存時,StorageMemory 才能借走該內存

UnifiedMemoryManager.scala中

Spark內存管理之三:UnifiedMemoryManager分析

從上述代碼中我們可以知道能借到的內存數為:

val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree,numBytes)

所以StorageMemory從ExecutionMemory借走的內存,完全取決於當時ExecutionMemory是不是有空閒內存;借到內存後,storageMemoryPool增加借到的這部分內存,之後同上一樣,會調用StorageMemoryPool的acquireMemory()方法

StorageMemoryPool.acquireMemory

Spark內存管理之三:UnifiedMemoryManager分析

"

相關推薦

推薦中...