Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Gluten OOM with multi-slot executor configuration due to the Vanilla Spark memory acquisition strategy #8128

Open
kecookier opened this issue Dec 3, 2024 · 5 comments · May be fixed by #8132
Labels
bug Something isn't working triage

Comments

@kecookier
Copy link
Contributor

Backend

VL (Velox)

Bug description

After spilled large memory, Task OOM, the detail log

24/12/03 09:46:23 INFO Executor task launch worker for task 1925 ColumnarShuffleWriter: Gluten shuffle writer: Trying to spill 2033398579 bytes of data
24/12/03 09:46:28 INFO Executor task launch worker for task 1925 ColumnarShuffleWriter: Gluten shuffle writer: Spilled 5991913856 / 2033398579 bytes of data
24/12/03 09:46:28 WARN Executor task launch worker for task 1925 TreeMemoryConsumer: TreeMemoryConsumer.spill end spilled:5991913856
24/12/03 09:46:29 ERROR Executor task launch worker for task 1925 ThrowOnOomMemoryTarget: Not enough spark off-heap execution memory. Acquired: 8.0 MiB, granted: 0.0 B. Try tweaking config option spark.memory.offHeap.size to get larger space to run this application (if spark.gluten.memory.dynamic.offHeap.sizing.enabled is not enabled). 
Current config settings: 
	spark.gluten.memory.offHeap.size.in.bytes=12.8 GiB
	spark.gluten.memory.task.offHeap.size.in.bytes=6.4 GiB
	spark.gluten.memory.conservative.task.offHeap.size.in.bytes=3.2 GiB
	spark.memory.offHeap.enabled=true
	spark.gluten.memory.dynamic.offHeap.sizing.enabled=false
Memory consumer stats: 
	Task.1925:                                             Current used bytes: 1144.0 MiB, peak bytes:        N/A
	\- Gluten.Tree.3:                                      Current used bytes: 1144.0 MiB, peak bytes:    8.8 GiB
	   \- root.3:                                          Current used bytes: 1144.0 MiB, peak bytes:    8.8 GiB
	      +- ShuffleWriter.3:                              Current used bytes:  744.0 MiB, peak bytes:    6.5 GiB
	      |  \- single:                                    Current used bytes:  744.0 MiB, peak bytes:    6.5 GiB
	      |     +- root:                                   Current used bytes:  719.1 MiB, peak bytes:  864.0 MiB
	      |     |  \- default_leaf:                        Current used bytes:  719.1 MiB, peak bytes:  859.2 MiB
	      |     \- gluten::MemoryAllocator:                Current used bytes:   29.5 MiB, peak bytes:    5.6 GiB
	      +- VeloxBatchAppender.3:                         Current used bytes:  216.0 MiB, peak bytes:  216.0 MiB
	      |  \- single:                                    Current used bytes:  216.0 MiB, peak bytes:  216.0 MiB
	      |     +- root:                                   Current used bytes:  134.3 MiB, peak bytes:  216.0 MiB
	      |     |  \- default_leaf:                        Current used bytes:  134.3 MiB, peak bytes:  215.9 MiB
	      |     \- gluten::MemoryAllocator:                Current used bytes:      0.0 B, peak bytes:      0.0 B
	      +- NativePlanEvaluator-4.0:                      Current used bytes:  176.0 MiB, peak bytes:  176.0 MiB
	      |  \- single:                                    Current used bytes:  176.0 MiB, peak bytes:  176.0 MiB
	      |     +- root:                                   Current used bytes:   29.1 MiB, peak bytes:  169.0 MiB
	      |     |  +- task.Gluten_Stage_2_TID_1925_VTID_4: Current used bytes:   29.1 MiB, peak bytes:  169.0 MiB
	      |     |  |  +- node.0:                           Current used bytes:   28.6 MiB, peak bytes:  168.0 MiB
	      |     |  |  |  +- op.0.0.0.TableScan:            Current used bytes:   28.6 MiB, peak bytes:  162.7 MiB
	      |     |  |  |  \- op.0.0.0.TableScan.test-hive:  Current used bytes:      0.0 B, peak bytes:      0.0 B
	      |     |  |  \- node.1:                           Current used bytes:  528.2 KiB, peak bytes: 1024.0 KiB
	      |     |  |     \- op.1.0.0.FilterProject:        Current used bytes:  528.2 KiB, peak bytes:  849.5 KiB
	      |     |  \- default_leaf:                        Current used bytes:      0.0 B, peak bytes:      0.0 B
	      |     \- gluten::MemoryAllocator:                Current used bytes:      0.0 B, peak bytes:      0.0 B
	      +- IndicatorVectorBase#init.3:                   Current used bytes:    8.0 MiB, peak bytes:    8.0 MiB
	      |  \- single:                                    Current used bytes:    8.0 MiB, peak bytes:    8.0 MiB
	      |     +- gluten::MemoryAllocator:                Current used bytes:      0.0 B, peak bytes:      0.0 B
	      |     \- root:                                   Current used bytes:      0.0 B, peak bytes:      0.0 B
	      |        \- default_leaf:                        Current used bytes:      0.0 B, peak bytes:      0.0 B
	      +- NativePlanEvaluator-4.0.OverAcquire.0:        Current used bytes:      0.0 B, peak bytes:   52.8 MiB
	      +- ShuffleWriter.3.OverAcquire.0:                Current used bytes:      0.0 B, peak bytes: 1982.4 MiB
	      +- IndicatorVectorBase#init.3.OverAcquire.0:     Current used bytes:      0.0 B, peak bytes:    2.4 MiB
	      +- VeloxBatchAppender.3.OverAcquire.0:           Current used bytes:      0.0 B, peak bytes:   64.8 MiB
	      \- ArrowContextInstance.3:                       Current used bytes:      0.0 B, peak bytes:      0.0 B

The Underlying Logic

Executor slots = 2, and we use Gluten in shared mode. This means that Gluten will not limit the memory one task can use; it depends on Vanilla Spark's memory management. The maxPerTaskMem will be dynamic in a multi-slot environment.

In a multi-slot environment, the logic for Spark allocating memory to each task is as follows:

Assume slot = N, and the total execution off-heap memory for the executor is maxPoolSize. The maximum memory limit set by Spark for each task (maxPerTask) is dynamic and depends on the current number of tasks running in parallel (activeTaskNum). Spark ensures that the memory each task can request is:

  • minPerTask = poolSize / (2 * activeTaskNum)
  • maxPerTask = maxPoolSize / activeTaskNum

If the memory currently held by a task exceeds maxPerTask, any further memory requests will immediately return 0. This situation can easily occur in a multi-slot environment because activeTaskNum can change.

Each time Gluten requests memory, it calls Spark's memory request interface. When Spark returns 0, Gluten immediately considers it as an OOM.

For example, if slot = 8, consider the following timeline:

  1. Task1 is scheduled; at this time activeTaskNum = 1, and Task1 can request all the executor's off-heap memory. Suppose Task1 requests memory and gets m1 = 0.6 * M.
  2. Next, Task2 is scheduled; then activeTaskNum = 2, and Task2 requests memory, getting m2 = 0.1 * M.
  3. Then, Task1 requests an additional 0.1 * M, but at this point m1 > M / 2, so the request returns 0.
  4. Gluten determines that Task1 is OOM.

Root Cause

In the real case, slot = 2, executor.offheap = 12G. When activeTaskNum = 1, task 1925 holds 8.8G off-heap memory. Then Task 2007 is scheduled to this executor, maxPerTask is 12G / 2 = 6G. Then task 1925 acquires 8MB, as the logic described above, Spark will return 0. It triggers a spill of 8MB, but after the spill, the memory held by the task is still larger than 6G, so it still returns 0. OverAcquireTarget will reserve 8.8G * 0.3, which triggers a spill of 2.64G. In actuality, shuffleWrite spills almost 5.5G, but this is no help. The function will still return 0.

How to Resolve?

  1. Gluten provides a spark.gluten.memory.isolation mode, which can use a maximum of (executor.offheap.size / slot * 0.5). This value is less than maxPerTask of Vanilla Spark, which will waste a maximum of (executor.offheap.size / slot * 0.5) because storage memory will be shrunk in Vanilla Spark.
  2. Actually, in our case, we have enough memory to use, but ThrowOnOomMemoryTarget does not get it. Maybe we can retry borrowing in this situation; if we get less granted memory, retry to spill the maximum memory.

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@FelixYBW
Copy link
Contributor

FelixYBW commented Dec 5, 2024

How Vanilla Spark act here?

I remember @zhztheplayer mentioned when the first job take all executor memory, the second job schedule, the any memory allocation in the first job should spill its data to release memory for second job. Is it?

@kecookier
Copy link
Contributor Author

How Vanilla Spark act here?

I remember @zhztheplayer mentioned when the first job take all executor memory, the second job schedule, the any memory allocation in the first job should spill its data to release memory for second job. Is it?

@FelixYBW If the first job holds all the executor memory, which is M, then after the second job is scheduled, when the first job requests an allocation of SIZE memory, it will spill SIZE memory. However, if (M - SIZE) > M/2, Vanilla Spark will grant 0 for this request.

@kecookier
Copy link
Contributor Author

Detail of Vanilla Spark
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L131

private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => (),
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }

@zhztheplayer
Copy link
Member

zhztheplayer commented Dec 9, 2024

when the first job requests an allocation of SIZE memory, it will spill SIZE memory.

Correct. Though some of the memory consumers may not respect SIZE and will spill as much it held as possible. E.g., UnsafeExternalSorter.

if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask)

@kecookier Seems this code also conducts retrying. Do it and the PR functionally overlap more or less?

@kecookier
Copy link
Contributor Author

if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask)

Seems this code also conducts retrying. Do it and the PR functionally overlap more or less?

@zhztheplayer Yes, vanilla Spark only promises that each task holds at least minMemoryPerTask(1/2N) of the executor's memory. If the task already holds more than maxMemoryPerTask(1/N), it will not retry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
3 participants