Skip to content

Commit

Permalink
Add datafusion::test_util, resolve test data paths without env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
mluts committed Jun 4, 2021
1 parent 01b57f7 commit 177cdc0
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 26 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
// create local execution context
let mut ctx = ExecutionContext::new();

let testdata = datafusion::arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
Field::new("c13", DataType::Utf8, false),
]));

let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// create CSV data source
let csv = CsvFile::try_new(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//!
//! let testdata = arrow::util::test_util::arrow_test_data();
//! let testdata = datafusion::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {

#[tokio::test]
async fn csv_file_from_reader() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let buf = std::fs::read(path).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ mod tests {
}

fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
Ok(Arc::new(table))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod tests {

fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
let schema = test::aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
ctx.register_csv(
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@
//! cd arrow-datafusion
//! # Download test data
//! git submodule update --init
//! export PARQUET_TEST_DATA=parquet-testing/data
//! export ARROW_TEST_DATA=testing/data
//!
//! cargo run --example csv_sql
//!
Expand Down Expand Up @@ -234,6 +232,7 @@ pub use parquet;

#[cfg(test)]
pub mod test;
pub mod test_util;

#[macro_use]
#[cfg(feature = "regex_expressions")]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::try_new(
Expand Down Expand Up @@ -470,7 +470,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_without_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::try_new(
Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_with_reader() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let buf = std::fs::read(path).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ mod tests {

#[tokio::test]
async fn test() -> Result<()> {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
let parquet_exec = ParquetExec::try_from_path(
&filename,
Expand Down
12 changes: 6 additions & 6 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ mod tests {

#[test]
fn test_all_operators() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand Down Expand Up @@ -905,7 +905,7 @@ mod tests {

#[test]
fn test_with_csv_plan() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand All @@ -924,7 +924,7 @@ mod tests {

#[test]
fn errors() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);

Expand Down Expand Up @@ -1026,7 +1026,7 @@ mod tests {

#[test]
fn in_list_types() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);

Expand Down Expand Up @@ -1074,7 +1074,7 @@ mod tests {

#[test]
fn hash_agg_input_schema() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand All @@ -1097,7 +1097,7 @@ mod tests {

#[test]
fn hash_agg_group_by_partitioned() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {

/// Generated partitioned copy of a CSV file
pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<String> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/{}", testdata, filename);

let tmp_dir = TempDir::new()?;
Expand Down
166 changes: 166 additions & 0 deletions datafusion/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utils to make testing easier

use std::{env, error::Error, path::PathBuf};

/// Returns the arrow test data directory, which is by default stored
/// in a git submodule rooted at `testing/data`.
///
/// The default can be overridden by the optional environment
/// variable `ARROW_TEST_DATA`
///
/// panics when the directory can not be found.
///
/// Example:
/// ```
/// let testdata = datafusion::test_util::arrow_test_data();
/// let csvdata = format!("{}/csv/aggregate_test_100.csv", testdata);
/// assert!(std::path::PathBuf::from(csvdata).exists());
/// ```
pub fn arrow_test_data() -> String {
match get_data_dir("ARROW_TEST_DATA", "../testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get arrow data dir: {}", err),
}
}

/// Returns the parquest test data directory, which is by default
/// stored in a git submodule rooted at
/// `parquest-testing/data`.
///
/// The default can be overridden by the optional environment variable
/// `PARQUET_TEST_DATA`
///
/// panics when the directory can not be found.
///
/// Example:
/// ```
/// let testdata = datafusion::test_util::parquet_test_data();
/// let filename = format!("{}/binary.parquet", testdata);
/// assert!(std::path::PathBuf::from(filename).exists());
/// ```
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get parquet data dir: {}", err),
}
}

/// Returns a directory path for finding test data.
///
/// udf_env: name of an environment variable
///
/// submodule_dir: fallback path (relative to CARGO_MANIFEST_DIR)
///
/// Returns either:
/// The path referred to in `udf_env` if that variable is set and refers to a directory
/// The submodule_data directory relative to CARGO_MANIFEST_PATH
fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn Error>> {
// Try user defined env.
if let Ok(dir) = env::var(udf_env) {
let trimmed = dir.trim().to_string();
if !trimmed.is_empty() {
let pb = PathBuf::from(trimmed);
if pb.is_dir() {
return Ok(pb);
} else {
return Err(format!(
"the data dir `{}` defined by env {} not found",
pb.display().to_string(),
udf_env
)
.into());
}
}
}

// The env is undefined or its value is trimmed to empty, let's try default dir.

// env "CARGO_MANIFEST_DIR" is "the directory containing the manifest of your package",
// set by `cargo run` or `cargo test`, see:
// https://doc.rust-lang.org/cargo/reference/environment-variables.html
let dir = env!("CARGO_MANIFEST_DIR");

let pb = PathBuf::from(dir).join(submodule_data);
if pb.is_dir() {
Ok(pb)
} else {
Err(format!(
"env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
HINT: try running `git submodule update --init`",
udf_env,
pb.display().to_string(),
).into())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::env;

#[test]
fn test_data_dir() {
let udf_env = "get_data_dir";
let cwd = env::current_dir().unwrap();

let existing_pb = cwd.join("..");
let existing = existing_pb.display().to_string();
let existing_str = existing.as_str();

let non_existing = cwd.join("non-existing-dir").display().to_string();
let non_existing_str = non_existing.as_str();

env::set_var(udf_env, non_existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_err());

env::set_var(udf_env, "");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, " ");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::remove_var(udf_env);
let res = get_data_dir(udf_env, non_existing_str);
assert!(res.is_err());

let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
}

#[test]
fn test_happy() {
let res = arrow_test_data();
assert!(PathBuf::from(res).is_dir());

let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}
}
14 changes: 7 additions & 7 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn parquet_query() {
#[tokio::test]
async fn parquet_single_nan_schema() {
let mut ctx = ExecutionContext::new();
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.unwrap();
let sql = "SELECT mycol FROM single_nan";
Expand All @@ -144,7 +144,7 @@ async fn parquet_single_nan_schema() {
#[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"]
async fn parquet_list_columns() {
let mut ctx = ExecutionContext::new();
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
Expand Down Expand Up @@ -2009,7 +2009,7 @@ fn aggr_test_schema() -> SchemaRef {
}

async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
// unsigned is supported.
Expand Down Expand Up @@ -2049,7 +2049,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
}

fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();
let schema = aggr_test_schema();
ctx.register_csv(
"aggregate_test_100",
Expand All @@ -2076,7 +2076,7 @@ fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> {
}

fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
Expand Down Expand Up @@ -3374,7 +3374,7 @@ async fn test_physical_plan_display_indent() {
" CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
];

let data_path = arrow::util::test_util::arrow_test_data();
let data_path = datafusion::test_util::arrow_test_data();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
Expand Down Expand Up @@ -3423,7 +3423,7 @@ async fn test_physical_plan_display_indent_multi_children() {
" CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
];

let data_path = arrow::util::test_util::arrow_test_data();
let data_path = datafusion::test_util::arrow_test_data();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
Expand Down

0 comments on commit 177cdc0

Please sign in to comment.