From feff7f2ccc4193eadbce3510180b3c9edf2f8f88 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 14 May 2022 13:46:59 +0100 Subject: [PATCH] Reduce duplication in file scan tests --- .../src/physical_plan/coalesce_partitions.rs | 22 +--- .../core/src/physical_plan/file_format/csv.rs | 92 +++++----------- datafusion/core/src/physical_plan/filter.rs | 37 +------ datafusion/core/src/physical_plan/limit.rs | 25 +---- .../core/src/physical_plan/projection.rs | 25 +---- .../core/src/physical_plan/sorts/sort.rs | 49 ++------- .../sorts/sort_preserving_merge.rs | 39 +------ datafusion/core/src/physical_plan/union.rs | 51 ++------- .../core/src/physical_plan/windows/mod.rs | 28 +---- datafusion/core/src/test/mod.rs | 102 +++++++++++------- 10 files changed, 118 insertions(+), 352 deletions(-) diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 11fcd5d50be8..d1c797eacd5c 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -209,41 +209,23 @@ mod tests { use futures::FutureExt; use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::{collect, common}; use crate::prelude::SessionContext; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; - use crate::test_util; #[tokio::test] async fn merge() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); let num_partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?; - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: schema, - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(num_partitions)?; // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let merge = CoalescePartitionsExec::new(Arc::new(csv)); + let merge = CoalescePartitionsExec::new(csv); // output of CoalescePartitionsExec should have a single partition assert_eq!(merge.output_partitioning().partition_count(), 1); diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 96bceb2251f6..b78662e46713 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -212,9 +212,8 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; - use crate::datasource::listing::local_unpartitioned_file; use crate::prelude::*; + use crate::test::partitioned_csv_config; use crate::test_util::aggr_test_schema_with_missing_col; use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; use arrow::datatypes::*; @@ -228,22 +227,11 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); - let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; - let path = format!("{}/csv/{}", testdata, filename); - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema, - file_groups: vec![vec![local_unpartitioned_file(path)]], - statistics: Statistics::default(), - projection: Some(vec![0, 2, 4]), - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let mut config = partitioned_csv_config(filename, file_schema, 1)?; + config.projection = Some(vec![0, 2, 4]); + + let csv = CsvExec::new(config, true, b','); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(3, csv.projected_schema.fields().len()); assert_eq!(3, csv.schema().fields().len()); @@ -275,22 +263,11 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); - let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; - let path = format!("{}/csv/{}", testdata, filename); - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema, - file_groups: vec![vec![local_unpartitioned_file(path)]], - statistics: Statistics::default(), - projection: None, - limit: Some(5), - table_partition_cols: vec![], - }, - true, - b',', - ); + let mut config = partitioned_csv_config(filename, file_schema, 1)?; + config.limit = Some(5); + + let csv = CsvExec::new(config, true, b','); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(13, csv.projected_schema.fields().len()); assert_eq!(13, csv.schema().fields().len()); @@ -322,22 +299,11 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema_with_missing_col(); - let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; - let path = format!("{}/csv/{}", testdata, filename); - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema, - file_groups: vec![vec![local_unpartitioned_file(path)]], - statistics: Statistics::default(), - projection: None, - limit: Some(5), - table_partition_cols: vec![], - }, - true, - b',', - ); + let mut config = partitioned_csv_config(filename, file_schema, 1)?; + config.limit = Some(5); + + let csv = CsvExec::new(config, true, b','); assert_eq!(14, csv.base_config.file_schema.fields().len()); assert_eq!(14, csv.projected_schema.fields().len()); assert_eq!(14, csv.schema().fields().len()); @@ -369,29 +335,21 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let file_schema = aggr_test_schema(); - let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; + let mut config = partitioned_csv_config(filename, file_schema.clone(), 1)?; + + // Add partition columns + config.table_partition_cols = vec!["date".to_owned()]; + config.file_groups[0][0].partition_values = + vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))]; + + // We should be able to project on the partition column + // Which is supposed to be after the file fields + config.projection = Some(vec![0, file_schema.fields().len()]); + // we don't have `/date=xx/` in the path but that is ok because // partitions are resolved during scan anyway - let path = format!("{}/csv/{}", testdata, filename); - let mut partitioned_file = local_unpartitioned_file(path); - partitioned_file.partition_values = - vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))]; - let csv = CsvExec::new( - FileScanConfig { - // we should be able to project on the partition column - // wich is supposed to be after the file fields - projection: Some(vec![0, file_schema.fields().len()]), - object_store: Arc::new(LocalFileSystem {}), - file_schema, - file_groups: vec![vec![partitioned_file]], - statistics: Statistics::default(), - limit: None, - table_partition_cols: vec!["date".to_owned()], - }, - true, - b',', - ); + let csv = CsvExec::new(config, true, b','); assert_eq!(13, csv.base_config.file_schema.fields().len()); assert_eq!(2, csv.projected_schema.fields().len()); assert_eq!(2, csv.schema().fields().len()); diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index 4aa6453bfb73..b91c6bb9d5d6 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -235,9 +235,7 @@ impl RecordBatchStream for FilterExecStream { mod tests { use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::physical_plan::expressions::*; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{collect, with_new_children_if_necessary}; use crate::prelude::SessionContext; @@ -254,22 +252,7 @@ mod tests { let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(partitions)?; let predicate: Arc = binary( binary( @@ -289,7 +272,7 @@ mod tests { )?; let filter: Arc = - Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?); + Arc::new(FilterExec::try_new(predicate, csv)?); let results = collect(filter, task_ctx).await?; @@ -307,21 +290,7 @@ mod tests { async fn with_new_children() -> Result<()> { let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - let input = Arc::new(CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - )); + let input = test::scan_partitioned_csv(partitions)?; let predicate: Arc = binary( col("c2", &schema)?, diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 00516db0bf75..6ad93d3e406f 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -426,42 +426,23 @@ mod tests { use common::collect; use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::prelude::SessionContext; - use crate::{test, test_util}; + use crate::test; #[tokio::test] async fn limit() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); let num_partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", num_partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: schema, - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(num_partitions)?; // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let limit = - GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); + let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(csv)), 7); // the result should contain 4 batches (one per input partition) let iter = limit.execute(0, task_ctx)?; diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 8e8a1ee54d3c..5fa3c93cdd42 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -295,9 +295,7 @@ impl RecordBatchStream for ProjectionStream { mod tests { use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::physical_plan::expressions::{self, col}; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use crate::test::{self}; @@ -311,28 +309,11 @@ mod tests { let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(partitions)?; // pick column c1 and name it column c1 in the output schema - let projection = ProjectionExec::try_new( - vec![(col("c1", &schema)?, "c1".to_string())], - Arc::new(csv), - )?; + let projection = + ProjectionExec::try_new(vec![(col("c1", &schema)?, "c1".to_string())], csv)?; let col_field = projection.schema.field(0); let col_metadata = col_field.metadata().unwrap().clone(); diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 3a0b5f2a16dc..c75f73f56d29 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -915,21 +915,16 @@ async fn do_sort( #[cfg(test)] mod tests { use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::execution::context::SessionConfig; use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use crate::physical_plan::collect; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::{ - collect, - file_format::{CsvExec, FileScanConfig}, - }; use crate::prelude::SessionContext; use crate::test; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::test_util; use arrow::array::*; use arrow::compute::SortOptions; use arrow::datatypes::*; @@ -940,24 +935,9 @@ mod tests { async fn test_in_mem_sort() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(partitions)?; + let schema = csv.schema(); let sort_exec = Arc::new(SortExec::try_new( vec![ @@ -977,7 +957,7 @@ mod tests { options: SortOptions::default(), }, ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), + Arc::new(CoalescePartitionsExec::new(csv)), )?); let result = collect(sort_exec, task_ctx).await?; @@ -1017,24 +997,9 @@ mod tests { let runtime = Arc::new(RuntimeEnv::new(config)?); let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); - let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); + let csv = test::scan_partitioned_csv(partitions)?; + let schema = csv.schema(); let sort_exec = Arc::new(SortExec::try_new( vec![ @@ -1054,7 +1019,7 @@ mod tests { options: SortOptions::default(), }, ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), + Arc::new(CoalescePartitionsExec::new(csv)), )?); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 8e3326255cff..515300eff5ce 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -601,7 +601,6 @@ impl RecordBatchStream for SortPreservingMergeStream { #[cfg(test)] mod tests { - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::from_slice::FromSlice; use crate::physical_plan::metrics::MetricValue; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; @@ -611,7 +610,6 @@ mod tests { use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{collect, common}; @@ -897,24 +895,9 @@ mod tests { async fn test_partition_sort() { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); - - let csv = Arc::new(CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - )); + let csv = test::scan_partitioned_csv(partitions).unwrap(); + let schema = csv.schema(); let sort = vec![ PhysicalSortExpr { @@ -984,24 +967,8 @@ mod tests { sizes: &[usize], context: Arc, ) -> Arc { - let schema = test_util::aggr_test_schema(); let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap(); - - let csv = Arc::new(CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: schema, - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - )); + let csv = test::scan_partitioned_csv(partitions).unwrap(); let sorted = basic_sort(csv, sort, context).await; let split: Vec<_> = sizes.iter().map(|x| split_batch(&sorted, *x)).collect(); diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index e89ac4a76d0e..d57fbe0f3df3 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -242,61 +242,22 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { #[cfg(test)] mod tests { use super::*; - use crate::datafusion_data_access::object_store::{ - local::LocalFileSystem, ObjectStore, - }; - use crate::{test, test_util}; + use crate::test; use crate::prelude::SessionContext; - use crate::{ - physical_plan::{ - collect, - file_format::{CsvExec, FileScanConfig}, - }, - scalar::ScalarValue, - }; + use crate::{physical_plan::collect, scalar::ScalarValue}; use arrow::record_batch::RecordBatch; #[tokio::test] async fn test_union_partitions() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); - let fs: Arc = Arc::new(LocalFileSystem {}); // Create csv's with different partitioning - let (_, files) = test::create_partitioned_csv("aggregate_test_100.csv", 4)?; - let (_, files2) = test::create_partitioned_csv("aggregate_test_100.csv", 5)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::clone(&fs), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let csv2 = CsvExec::new( - FileScanConfig { - object_store: Arc::clone(&fs), - file_schema: Arc::clone(&schema), - file_groups: files2, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let union_exec = Arc::new(UnionExec::new(vec![Arc::new(csv), Arc::new(csv2)])); + let csv = test::scan_partitioned_csv(4)?; + let csv2 = test::scan_partitioned_csv(5)?; + + let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])); // Should have 9 partitions and 9 output batches assert_eq!(union_exec.output_partitioning().partition_count(), 9); diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index c1a53aca61d3..496b4a6ef7f0 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -154,40 +154,22 @@ fn create_built_in_window_expr( #[cfg(test)] mod tests { use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; - use crate::physical_plan::{collect, Statistics}; + use crate::physical_plan::file_format::CsvExec; + use crate::physical_plan::{collect, ExecutionPlan}; use crate::prelude::SessionContext; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending}; - use crate::test_util::{self, aggr_test_schema}; use arrow::array::*; use arrow::datatypes::{DataType, Field, SchemaRef}; use arrow::record_batch::RecordBatch; use futures::FutureExt; fn create_test_schema(partitions: usize) -> Result<(Arc, SchemaRef)> { - let schema = test_util::aggr_test_schema(); - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: aggr_test_schema(), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let input = Arc::new(csv); - Ok((input, schema)) + let csv = test::scan_partitioned_csv(partitions)?; + let schema = csv.schema(); + Ok((csv, schema)) } #[tokio::test] diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 6927178281a9..c8523a52e44d 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -18,17 +18,17 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; -use crate::datasource::{ - listing::{local_unpartitioned_file, PartitionedFile}, - MemTable, TableProvider, -}; +use crate::datasource::{listing::local_unpartitioned_file, MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; +use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; +use crate::test_util::aggr_test_schema; use array::{Array, ArrayRef}; use arrow::array::{self, DecimalBuilder, Int32Array}; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use datafusion_data_access::object_store::local::LocalFileSystem; use futures::{Future, FutureExt}; use std::fs::File; use std::io::prelude::*; @@ -54,55 +54,75 @@ pub fn create_table_dual() -> Arc { Arc::new(provider) } -/// Generated partitioned copy of a CSV file -pub fn create_partitioned_csv( +/// Returns a [`CsvExec`] that scans "aggregate_test_100.csv" with `partitions` partitions +pub fn scan_partitioned_csv(partitions: usize) -> Result> { + let schema = aggr_test_schema(); + let config = partitioned_csv_config("aggregate_test_100.csv", schema, partitions)?; + Ok(Arc::new(CsvExec::new(config, true, b','))) +} + +/// Returns a [`FileScanConfig`] for scanning `partitions` partitions of `filename` +pub fn partitioned_csv_config( filename: &str, + schema: SchemaRef, partitions: usize, -) -> Result<(String, Vec>)> { +) -> Result { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/{}", testdata, filename); - let tmp_dir = TempDir::new()?; + let file_groups = if partitions > 1 { + let tmp_dir = TempDir::new()?.into_path(); - let mut writers = vec![]; - let mut files = vec![]; - for i in 0..partitions { - let filename = format!("partition-{}.csv", i); - let filename = tmp_dir.path().join(&filename); + let mut writers = vec![]; + let mut files = vec![]; + for i in 0..partitions { + let filename = format!("partition-{}.csv", i); + let filename = tmp_dir.join(&filename); - let writer = BufWriter::new(File::create(&filename).unwrap()); - writers.push(writer); - files.push(filename); - } + let writer = BufWriter::new(File::create(&filename).unwrap()); + writers.push(writer); + files.push(filename); + } - let f = File::open(&path)?; - let f = BufReader::new(f); - for (i, line) in f.lines().enumerate() { - let line = line.unwrap(); + let f = File::open(&path)?; + let f = BufReader::new(f); + for (i, line) in f.lines().enumerate() { + let line = line.unwrap(); - if i == 0 { - // write header to all partitions - for w in writers.iter_mut() { - w.write_all(line.as_bytes()).unwrap(); - w.write_all(b"\n").unwrap(); + if i == 0 { + // write header to all partitions + for w in writers.iter_mut() { + w.write_all(line.as_bytes()).unwrap(); + w.write_all(b"\n").unwrap(); + } + } else { + // write data line to single partition + let partition = i % partitions; + writers[partition].write_all(line.as_bytes()).unwrap(); + writers[partition].write_all(b"\n").unwrap(); } - } else { - // write data line to single partition - let partition = i % partitions; - writers[partition].write_all(line.as_bytes()).unwrap(); - writers[partition].write_all(b"\n").unwrap(); } - } - for w in writers.iter_mut() { - w.flush().unwrap(); - } + for w in writers.iter_mut() { + w.flush().unwrap(); + } - let groups = files - .into_iter() - .map(|f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned())]) - .collect::>(); + files + .into_iter() + .map(|f| vec![local_unpartitioned_file(f.to_str().unwrap().to_owned())]) + .collect::>() + } else { + vec![vec![local_unpartitioned_file(path)]] + }; - Ok((tmp_dir.into_path().to_str().unwrap().to_string(), groups)) + Ok(FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema: schema, + file_groups, + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }) } /// some tests share a common table with different names