diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 30ada4c9b..b1838ca80 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best performance from you ## Memory Tuning -Comet provides two options for memory management: - -- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option. -- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark. - -### Unified Memory Management - -This option is automatically enabled when `spark.memory.offHeap.enabled=true`. +Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`. +If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark. Each executor will have a single memory pool which will be shared by all native plans being executed within that process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`. -### Native Memory Management - -This option is automatically enabled when `spark.memory.offHeap.enabled=false`. - -Each native plan has a dedicated memory pool. - -By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value -for `spark.comet.memory.overhead.factor` is `0.2`. - -It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can -be calculated with `spark.executor.cores / spark.task.cpus`. - -For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be -`4 * spark.comet.memory.overhead.factor * spark.executor.memory`. - -It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating -it based on `spark.comet.memory.overhead.factor`. - -If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used. - -Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool. - ### Determining How Much Memory to Allocate Generally, increasing memory overhead will improve query performance, especially for queries containing joins and diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index d7e8ccaba..47d87fe1a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -202,27 +202,9 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); - // Check if we are using unified memory manager integrated with Spark. Default to false if not - // set. - let use_unified_memory_manager = parse_bool(conf, "use_unified_memory_manager")?; - - if use_unified_memory_manager { - // Set Comet memory pool for native - let memory_pool = CometMemoryPool::new(comet_task_memory_manager); - rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); - } else { - // Use the memory pool from DF - if conf.contains_key("memory_limit") { - let memory_limit = conf.get("memory_limit").unwrap().parse::()?; - let memory_fraction = conf - .get("memory_fraction") - .ok_or(CometError::Internal( - "Config 'memory_fraction' is not specified from Comet JVM side".to_string(), - ))? - .parse::()?; - rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction) - } - } + // Set Comet memory pool for native + let memory_pool = CometMemoryPool::new(comet_task_memory_manager); + rt_config = rt_config.with_memory_pool(Arc::new(memory_pool)); // Get Datafusion configuration from Spark Execution context // can be configured in Comet Spark JVM using Spark --conf parameters diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 07dd80c39..b1f22726a 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS} import org.apache.comet.vector.NativeUtil /** @@ -75,15 +75,6 @@ class CometExecIterator( val result = new java.util.HashMap[String, String]() val conf = SparkEnv.get.conf - val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) - // Only enable unified memory manager when off-heap mode is enabled. Otherwise, - // we'll use the built-in memory pool from DF, and initializes with `memory_limit` - // and `memory_fraction` below. - result.put( - "use_unified_memory_manager", - String.valueOf(conf.get("spark.memory.offHeap.enabled", "false"))) - result.put("memory_limit", String.valueOf(maxMemory)) - result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get())) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 6a5c0efea..1c4ffcf3e 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -915,6 +915,13 @@ class CometSparkSessionExtensions } override def apply(plan: SparkPlan): SparkPlan = { + + // Comet required off-heap memory to be enabled + if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) { + logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false") + return plan + } + // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is // enabled. if (isANSIEnabled(conf)) { diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index a553e61c7..16a7e5338 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils import org.apache.spark.SparkContext +import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE} import org.apache.spark.sql.TPCDSBase import org.apache.spark.sql.catalyst.expressions.AttributeSet import org.apache.spark.sql.catalyst.util.resourceToString @@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa conf.set( "spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") + conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")