Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce duplication in file scan tests #2533

Merged
merged 1 commit into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,41 +209,23 @@ mod tests {
use futures::FutureExt;

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::{collect, common};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::test_util;

#[tokio::test]
async fn merge() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?;
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(num_partitions)?;

// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let merge = CoalescePartitionsExec::new(Arc::new(csv));
let merge = CoalescePartitionsExec::new(csv);

// output of CoalescePartitionsExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
Expand Down
92 changes: 25 additions & 67 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ pub async fn plan_to_csv(
#[cfg(test)]
mod tests {
use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::datasource::listing::local_unpartitioned_file;
use crate::prelude::*;
use crate::test::partitioned_csv_config;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
use arrow::datatypes::*;
Expand All @@ -228,22 +227,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: Some(vec![0, 2, 4]),
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.projection = Some(vec![0, 2, 4]);

let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
Expand Down Expand Up @@ -275,22 +263,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.limit = Some(5);

let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
Expand Down Expand Up @@ -322,22 +299,11 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema_with_missing_col();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![local_unpartitioned_file(path)]],
statistics: Statistics::default(),
projection: None,
limit: Some(5),
table_partition_cols: vec![],
},
true,
b',',
);
let mut config = partitioned_csv_config(filename, file_schema, 1)?;
config.limit = Some(5);

let csv = CsvExec::new(config, true, b',');
assert_eq!(14, csv.base_config.file_schema.fields().len());
assert_eq!(14, csv.projected_schema.fields().len());
assert_eq!(14, csv.schema().fields().len());
Expand Down Expand Up @@ -369,29 +335,21 @@ mod tests {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let file_schema = aggr_test_schema();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let mut config = partitioned_csv_config(filename, file_schema.clone(), 1)?;

// Add partition columns
config.table_partition_cols = vec!["date".to_owned()];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];

// We should be able to project on the partition column
// Which is supposed to be after the file fields
config.projection = Some(vec![0, file_schema.fields().len()]);

// we don't have `/date=xx/` in the path but that is ok because
// partitions are resolved during scan anyway
let path = format!("{}/csv/{}", testdata, filename);
let mut partitioned_file = local_unpartitioned_file(path);
partitioned_file.partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
let csv = CsvExec::new(
FileScanConfig {
// we should be able to project on the partition column
// wich is supposed to be after the file fields
projection: Some(vec![0, file_schema.fields().len()]),
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups: vec![vec![partitioned_file]],
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
},
true,
b',',
);
let csv = CsvExec::new(config, true, b',');
assert_eq!(13, csv.base_config.file_schema.fields().len());
assert_eq!(2, csv.projected_schema.fields().len());
assert_eq!(2, csv.schema().fields().len());
Expand Down
37 changes: 3 additions & 34 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ impl RecordBatchStream for FilterExecStream {
mod tests {

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::expressions::*;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{collect, with_new_children_if_necessary};
use crate::prelude::SessionContext;
Expand All @@ -254,22 +252,7 @@ mod tests {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(partitions)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
binary(
Expand All @@ -289,7 +272,7 @@ mod tests {
)?;

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
Arc::new(FilterExec::try_new(predicate, csv)?);

let results = collect(filter, task_ctx).await?;

Expand All @@ -307,21 +290,7 @@ mod tests {
async fn with_new_children() -> Result<()> {
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let input = Arc::new(CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
));
let input = test::scan_partitioned_csv(partitions)?;

let predicate: Arc<dyn PhysicalExpr> = binary(
col("c2", &schema)?,
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,42 +426,23 @@ mod tests {
use common::collect;

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::prelude::SessionContext;
use crate::{test, test_util};
use crate::test;

#[tokio::test]
async fn limit() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let schema = test_util::aggr_test_schema();

let num_partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(num_partitions)?;

// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 7);

// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0, task_ctx)?;
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ impl RecordBatchStream for ProjectionStream {
mod tests {

use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::physical_plan::expressions::{self, col};
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::{self};
Expand All @@ -311,28 +309,11 @@ mod tests {
let schema = test_util::aggr_test_schema();

let partitions = 4;
let (_, files) =
test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: Arc::clone(&schema),
file_groups: files,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
true,
b',',
);
let csv = test::scan_partitioned_csv(partitions)?;

// pick column c1 and name it column c1 in the output schema
let projection = ProjectionExec::try_new(
vec![(col("c1", &schema)?, "c1".to_string())],
Arc::new(csv),
)?;
let projection =
ProjectionExec::try_new(vec![(col("c1", &schema)?, "c1".to_string())], csv)?;

let col_field = projection.schema.field(0);
let col_metadata = col_field.metadata().unwrap().clone();
Expand Down
Loading