From 177cdc04b6da951fd1623d38083d3311e7fa4267 Mon Sep 17 00:00:00 2001 From: Michael Lutsiuk Date: Thu, 3 Jun 2021 19:37:06 +0300 Subject: [PATCH] Add datafusion::test_util, resolve test data paths without env vars --- datafusion-examples/examples/csv_sql.rs | 2 +- datafusion/benches/sort_limit_query_sql.rs | 2 +- datafusion/src/datasource/csv.rs | 4 +- datafusion/src/datasource/parquet.rs | 2 +- datafusion/src/execution/dataframe_impl.rs | 2 +- datafusion/src/lib.rs | 3 +- datafusion/src/physical_plan/csv.rs | 6 +- datafusion/src/physical_plan/parquet.rs | 2 +- datafusion/src/physical_plan/planner.rs | 12 +- datafusion/src/test/mod.rs | 2 +- datafusion/src/test_util.rs | 166 +++++++++++++++++++++ datafusion/tests/sql.rs | 14 +- 12 files changed, 191 insertions(+), 26 deletions(-) create mode 100644 datafusion/src/test_util.rs diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs index 76c87960d71d..a06b42ad4cb0 100644 --- a/datafusion-examples/examples/csv_sql.rs +++ b/datafusion-examples/examples/csv_sql.rs @@ -27,7 +27,7 @@ async fn main() -> Result<()> { // create local execution context let mut ctx = ExecutionContext::new(); - let testdata = datafusion::arrow::util::test_util::arrow_test_data(); + let testdata = datafusion::test_util::arrow_test_data(); // register csv file with the execution context ctx.register_csv( diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index be065f32e009..1e8339ea31eb 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -57,7 +57,7 @@ fn create_context() -> Arc> { Field::new("c13", DataType::Utf8, false), ])); - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = datafusion::test_util::arrow_test_data(); // create CSV data source let csv = CsvFile::try_new( diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 10e6659089b4..e1a61595f2ee 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -25,7 +25,7 @@ //! use datafusion::datasource::TableProvider; //! use datafusion::datasource::csv::{CsvFile, CsvReadOptions}; //! -//! let testdata = arrow::util::test_util::arrow_test_data(); +//! let testdata = datafusion::test_util::arrow_test_data(); //! let csvdata = CsvFile::try_new( //! &format!("{}/csv/aggregate_test_100.csv", testdata), //! CsvReadOptions::new().delimiter(b'|'), @@ -222,7 +222,7 @@ mod tests { #[tokio::test] async fn csv_file_from_reader() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let buf = std::fs::read(path).unwrap(); diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 30e47df5f649..abfb81d99887 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -328,7 +328,7 @@ mod tests { } fn load_table(name: &str) -> Result> { - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); let table = ParquetTable::try_new(&filename, 2)?; Ok(Arc::new(table)) diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index fdc75f92f2e7..19f71eb79268 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -369,7 +369,7 @@ mod tests { fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { let schema = test::aggr_test_schema(); - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); ctx.register_csv( "aggregate_test_100", &format!("{}/csv/aggregate_test_100.csv", testdata), diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 5b8c9c13006a..e4501a78ada4 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -194,8 +194,6 @@ //! cd arrow-datafusion //! # Download test data //! git submodule update --init -//! export PARQUET_TEST_DATA=parquet-testing/data -//! export ARROW_TEST_DATA=testing/data //! //! cargo run --example csv_sql //! @@ -234,6 +232,7 @@ pub use parquet; #[cfg(test)] pub mod test; +pub mod test_util; #[macro_use] #[cfg(feature = "regex_expressions")] diff --git a/datafusion/src/physical_plan/csv.rs b/datafusion/src/physical_plan/csv.rs index 9f88a53bc17c..544f98cba0c6 100644 --- a/datafusion/src/physical_plan/csv.rs +++ b/datafusion/src/physical_plan/csv.rs @@ -442,7 +442,7 @@ mod tests { #[tokio::test] async fn csv_exec_with_projection() -> Result<()> { let schema = aggr_test_schema(); - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::try_new( @@ -470,7 +470,7 @@ mod tests { #[tokio::test] async fn csv_exec_without_projection() -> Result<()> { let schema = aggr_test_schema(); - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::try_new( @@ -498,7 +498,7 @@ mod tests { #[tokio::test] async fn csv_exec_with_reader() -> Result<()> { let schema = aggr_test_schema(); - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let buf = std::fs::read(path).unwrap(); diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 55a6d96738cb..2bea94aee1e5 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -705,7 +705,7 @@ mod tests { #[tokio::test] async fn test() -> Result<()> { - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let parquet_exec = ParquetExec::try_from_path( &filename, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 7ddfaf8f6897..5b4e859be4f8 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -865,7 +865,7 @@ mod tests { #[test] fn test_all_operators() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -905,7 +905,7 @@ mod tests { #[test] fn test_with_csv_plan() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -924,7 +924,7 @@ mod tests { #[test] fn errors() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1026,7 +1026,7 @@ mod tests { #[test] fn in_list_types() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1074,7 +1074,7 @@ mod tests { #[test] fn hash_agg_input_schema() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1097,7 +1097,7 @@ mod tests { #[test] fn hash_agg_group_by_partitioned() -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index 926a69226169..51dfe7f3a099 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -52,7 +52,7 @@ pub fn create_table_dual() -> Arc { /// Generated partitioned copy of a CSV file pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/{}", testdata, filename); let tmp_dir = TempDir::new()?; diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs new file mode 100644 index 000000000000..e96e8e0c209f --- /dev/null +++ b/datafusion/src/test_util.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utils to make testing easier + +use std::{env, error::Error, path::PathBuf}; + +/// Returns the arrow test data directory, which is by default stored +/// in a git submodule rooted at `testing/data`. +/// +/// The default can be overridden by the optional environment +/// variable `ARROW_TEST_DATA` +/// +/// panics when the directory can not be found. +/// +/// Example: +/// ``` +/// let testdata = datafusion::test_util::arrow_test_data(); +/// let csvdata = format!("{}/csv/aggregate_test_100.csv", testdata); +/// assert!(std::path::PathBuf::from(csvdata).exists()); +/// ``` +pub fn arrow_test_data() -> String { + match get_data_dir("ARROW_TEST_DATA", "../testing/data") { + Ok(pb) => pb.display().to_string(), + Err(err) => panic!("failed to get arrow data dir: {}", err), + } +} + +/// Returns the parquest test data directory, which is by default +/// stored in a git submodule rooted at +/// `parquest-testing/data`. +/// +/// The default can be overridden by the optional environment variable +/// `PARQUET_TEST_DATA` +/// +/// panics when the directory can not be found. +/// +/// Example: +/// ``` +/// let testdata = datafusion::test_util::parquet_test_data(); +/// let filename = format!("{}/binary.parquet", testdata); +/// assert!(std::path::PathBuf::from(filename).exists()); +/// ``` +pub fn parquet_test_data() -> String { + match get_data_dir("PARQUET_TEST_DATA", "../parquet-testing/data") { + Ok(pb) => pb.display().to_string(), + Err(err) => panic!("failed to get parquet data dir: {}", err), + } +} + +/// Returns a directory path for finding test data. +/// +/// udf_env: name of an environment variable +/// +/// submodule_dir: fallback path (relative to CARGO_MANIFEST_DIR) +/// +/// Returns either: +/// The path referred to in `udf_env` if that variable is set and refers to a directory +/// The submodule_data directory relative to CARGO_MANIFEST_PATH +fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result> { + // Try user defined env. + if let Ok(dir) = env::var(udf_env) { + let trimmed = dir.trim().to_string(); + if !trimmed.is_empty() { + let pb = PathBuf::from(trimmed); + if pb.is_dir() { + return Ok(pb); + } else { + return Err(format!( + "the data dir `{}` defined by env {} not found", + pb.display().to_string(), + udf_env + ) + .into()); + } + } + } + + // The env is undefined or its value is trimmed to empty, let's try default dir. + + // env "CARGO_MANIFEST_DIR" is "the directory containing the manifest of your package", + // set by `cargo run` or `cargo test`, see: + // https://doc.rust-lang.org/cargo/reference/environment-variables.html + let dir = env!("CARGO_MANIFEST_DIR"); + + let pb = PathBuf::from(dir).join(submodule_data); + if pb.is_dir() { + Ok(pb) + } else { + Err(format!( + "env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\ + HINT: try running `git submodule update --init`", + udf_env, + pb.display().to_string(), + ).into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test_data_dir() { + let udf_env = "get_data_dir"; + let cwd = env::current_dir().unwrap(); + + let existing_pb = cwd.join(".."); + let existing = existing_pb.display().to_string(); + let existing_str = existing.as_str(); + + let non_existing = cwd.join("non-existing-dir").display().to_string(); + let non_existing_str = non_existing.as_str(); + + env::set_var(udf_env, non_existing_str); + let res = get_data_dir(udf_env, existing_str); + assert!(res.is_err()); + + env::set_var(udf_env, ""); + let res = get_data_dir(udf_env, existing_str); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), existing_pb); + + env::set_var(udf_env, " "); + let res = get_data_dir(udf_env, existing_str); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), existing_pb); + + env::set_var(udf_env, existing_str); + let res = get_data_dir(udf_env, existing_str); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), existing_pb); + + env::remove_var(udf_env); + let res = get_data_dir(udf_env, non_existing_str); + assert!(res.is_err()); + + let res = get_data_dir(udf_env, existing_str); + assert!(res.is_ok()); + assert_eq!(res.unwrap(), existing_pb); + } + + #[test] + fn test_happy() { + let res = arrow_test_data(); + assert!(PathBuf::from(res).is_dir()); + + let res = parquet_test_data(); + assert!(PathBuf::from(res).is_dir()); + } +} diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 029e9307e5f6..d77671e7f4ff 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -126,7 +126,7 @@ async fn parquet_query() { #[tokio::test] async fn parquet_single_nan_schema() { let mut ctx = ExecutionContext::new(); - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = datafusion::test_util::parquet_test_data(); ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata)) .unwrap(); let sql = "SELECT mycol FROM single_nan"; @@ -144,7 +144,7 @@ async fn parquet_single_nan_schema() { #[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"] async fn parquet_list_columns() { let mut ctx = ExecutionContext::new(); - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = datafusion::test_util::parquet_test_data(); ctx.register_parquet( "list_columns", &format!("{}/list_columns.parquet", testdata), @@ -2009,7 +2009,7 @@ fn aggr_test_schema() -> SchemaRef { } async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = datafusion::test_util::arrow_test_data(); // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once // unsigned is supported. @@ -2049,7 +2049,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) { } fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> { - let testdata = arrow::util::test_util::arrow_test_data(); + let testdata = datafusion::test_util::arrow_test_data(); let schema = aggr_test_schema(); ctx.register_csv( "aggregate_test_100", @@ -2076,7 +2076,7 @@ fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> { } fn register_alltypes_parquet(ctx: &mut ExecutionContext) { - let testdata = arrow::util::test_util::parquet_test_data(); + let testdata = datafusion::test_util::parquet_test_data(); ctx.register_parquet( "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), @@ -3374,7 +3374,7 @@ async fn test_physical_plan_display_indent() { " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", ]; - let data_path = arrow::util::test_util::arrow_test_data(); + let data_path = datafusion::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines() @@ -3423,7 +3423,7 @@ async fn test_physical_plan_display_indent_multi_children() { " CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true", ]; - let data_path = arrow::util::test_util::arrow_test_data(); + let data_path = datafusion::test_util::arrow_test_data(); let actual = format!("{}", displayable(physical_plan.as_ref()).indent()) .trim() .lines()