Skip to content

Commit

Permalink
Reduce duplication in file scan tests (#2533)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored May 16, 2022
1 parent a825891 commit 4d6428c
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 352 deletions.
22 changes: 2 additions & 20 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,41 +209,23 @@ mod tests {
use futures::FutureExt;

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::{collect, common};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::test_util;

#[tokio::test]
async fn merge() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?;
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(num_partitions)?;

// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let merge = CoalescePartitionsExec::new(Arc::new(csv));
let merge = CoalescePartitionsExec::new(csv);

// output of CoalescePartitionsExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
Expand Down
92 changes: 25 additions & 67 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::datasource::listing::local_unpartitioned_file;
use crate::prelude::*;
use crate::test::partitioned_csv_config;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
use arrow::datatypes::*;
Expand All @@ -228,22 +227,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: Some(vec![0, 2, 4]),
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
Expand Down Expand Up @@ -275,22 +263,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.limit = Some(5);

let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
Expand Down Expand Up @@ -322,22 +299,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema_with_missing_col();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.limit = Some(5);

let csv = CsvExec::new(config, true, b',');
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());
Expand Down Expand Up @@ -369,29 +335,21 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let mut config = partitioned_csv_config(filename, file_schema.clone(), 1)?;

// Add partition columns
config.table_partition_cols = vec!["date".to_owned()];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];

// We should be able to project on the partition column
// Which is supposed to be after the file fields
config.projection = Some(vec![0, file_schema.fields().len()]);

// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
let path = format!("{}/csv/{}", testdata, filename);
let mut partitioned_file = local_unpartitioned_file(path);
partitioned_file.partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
let csv = CsvExec::new(
FileScanConfig {
// we should be able to project on the partition column
// wich is supposed to be after the file fields
projection: Some(vec![0, file_schema.fields().len()]),
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
},
true,
b',',
);
let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(2, csv.projected_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());
Expand Down
37 changes: 3 additions & 34 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ impl RecordBatchStream for FilterExecStream {
mod tests {

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::expressions::*;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{collect, with_new_children_if_necessary};
use crate::prelude::SessionContext;
Expand All @@ -254,22 +252,7 @@ mod tests {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(partitions)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
binary(
Expand All @@ -289,7 +272,7 @@ mod tests {
)?;

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
Arc::new(FilterExec::try_new(predicate, csv)?);

let results = collect(filter, task_ctx).await?;

Expand All @@ -307,21 +290,7 @@ mod tests {
async fn with_new_children() -> Result<()> {
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let input = Arc::new(CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
));
let input = test::scan_partitioned_csv(partitions)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
col("c2", &schema)?,
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,42 +426,23 @@ mod tests {
use common::collect;

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::prelude::SessionContext;
use crate::{test, test_util};
use crate::test;

#[tokio::test]
async fn limit() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(num_partitions)?;

// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 7);

// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0, task_ctx)?;
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ impl RecordBatchStream for ProjectionStream {
mod tests {

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::expressions::{self, col};
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::{self};
Expand All @@ -311,28 +309,11 @@ mod tests {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(partitions)?;

// pick column c1 and name it column c1 in the output schema
let projection = ProjectionExec::try_new(
vec![(col("c1", &schema)?, "c1".to_string())],
Arc::new(csv),
)?;
let projection =
ProjectionExec::try_new(vec![(col("c1", &schema)?, "c1".to_string())], csv)?;

let col_field = projection.schema.field(0);
let col_metadata = col_field.metadata().unwrap().clone();
Expand Down
Loading

0 comments on commit 4d6428c

Please sign in to comment.