Skip to content

Commit

Permalink
Remove the config datafusion.execution.coalesce_target_batch_size and…
Browse files Browse the repository at this point in the history
… use datafusion.execution.batch_size instead
  • Loading branch information
kyotoYaho committed Dec 28, 2022
1 parent 747b720 commit 0e86a8d
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 43 deletions.
12 changes: 1 addition & 11 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
/// Configuration option "datafusion.execution.coalesce_batches"
pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics";

Expand Down Expand Up @@ -317,14 +313,8 @@ impl BuiltInConfigs {
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
'{}'.", OPT_BATCH_SIZE),
true,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
Expand Down
11 changes: 2 additions & 9 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
Expand Down Expand Up @@ -1545,14 +1545,7 @@ impl SessionState {
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config.coalesce_batches() {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.unwrap_or_default()
.try_into()
.unwrap(),
)));
physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size())));
}

SessionState {
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
use crate::config::OPT_COALESCE_BATCHES;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
Expand All @@ -312,9 +312,7 @@ mod tests {

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
);
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234));
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// CoalesceBatchesExec: target_batch_size=8192
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ pub fn with_new_children_if_necessary(
/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]",
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ fn get_config_bool_from_env() {
fn get_config_int_from_env() {
let config_key = "datafusion.execution.batch_size";
let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

// for valid testing
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096);
}

#[test]
fn get_config_int_from_env_invalid() {
let config_key = "datafusion.execution.coalesce_target_batch_size";
let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE";
// for invalid testing
env::set_var(env_key, "abc");
let config = ConfigOptions::from_env();
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value

// To avoid influence other testing, we need to clear this environment variable
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value
}
14 changes: 10 additions & 4 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use datafusion::{
async fn explain_analyze_baseline_metrics() {
// This test uses the execute function to run an actual plan under EXPLAIN ANALYZE
// and then validate the presence of baseline metrics for supported operators
let config = SessionConfig::new().with_target_partitions(3);
let config = SessionConfig::new()
.with_target_partitions(3)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv_by_sql(&ctx).await;
// a query with as many operators as we have metrics for
Expand Down Expand Up @@ -668,7 +670,9 @@ order by
#[tokio::test]
async fn test_physical_plan_display_indent() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await.unwrap();
let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
Expand Down Expand Up @@ -711,7 +715,9 @@ async fn test_physical_plan_display_indent() {
#[tokio::test]
async fn test_physical_plan_display_indent_multi_children() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&ctx).await.unwrap();
Expand Down Expand Up @@ -760,7 +766,7 @@ async fn test_physical_plan_display_indent_multi_children() {
async fn csv_explain() {
// This test uses the execute function that create full plan cycle: logical, optimized logical, and physical,
// then execute the physical plan and return the final explain results
let ctx = SessionContext::new();
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096));
register_aggregate_csv_by_sql(&ctx).await;
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)";
let actual = execute(&ctx, sql).await;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ fn create_join_context(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -241,7 +242,8 @@ fn create_left_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -313,7 +315,8 @@ fn create_right_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -618,7 +621,8 @@ fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
let ctx = SessionContext::with_config(
SessionConfig::new()
.set_bool(OPT_PREFER_HASH_JOIN, false)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Schema::new(vec![
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>

#[tokio::test]
async fn test_window_partition_by_order_by() -> Result<()> {
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_target_partitions(2)
.with_batch_size(4096),
);
register_aggregate_csv(&ctx).await?;

let sql = "SELECT \
Expand Down Expand Up @@ -2253,6 +2257,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(8)
.with_batch_size(4096)
.with_repartition_windows(true);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ datafusion.catalog.location NULL
datafusion.catalog.type NULL
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.coalesce_target_batch_size 4096
datafusion.execution.collect_statistics false
datafusion.execution.parquet.enable_page_index false
datafusion.execution.parquet.metadata_size_hint NULL
Expand Down

0 comments on commit 0e86a8d

Please sign in to comment.