Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add datafusion::test_util, resolve test data paths without env vars #498

Merged
merged 1 commit into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
// create local execution context
let mut ctx = ExecutionContext::new();

let testdata = datafusion::arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// register csv file with the execution context
ctx.register_csv(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
Field::new("c13", DataType::Utf8, false),
]));

let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// create CSV data source
let csv = CsvFile::try_new(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//!
//! let testdata = arrow::util::test_util::arrow_test_data();
//! let testdata = datafusion::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {

#[tokio::test]
async fn csv_file_from_reader() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let buf = std::fs::read(path).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ mod tests {
}

fn load_table(name: &str) -> Result<Arc<dyn TableProvider>> {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename, 2)?;
Ok(Arc::new(table))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod tests {

fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
let schema = test::aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
ctx.register_csv(
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@
//! cd arrow-datafusion
//! # Download test data
//! git submodule update --init
//! export PARQUET_TEST_DATA=parquet-testing/data
//! export ARROW_TEST_DATA=testing/data
//!
//! cargo run --example csv_sql
//!
Expand Down Expand Up @@ -234,6 +232,7 @@ pub use parquet;

#[cfg(test)]
pub mod test;
pub mod test_util;

#[macro_use]
#[cfg(feature = "regex_expressions")]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::try_new(
Expand Down Expand Up @@ -470,7 +470,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_without_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::try_new(
Expand Down Expand Up @@ -498,7 +498,7 @@ mod tests {
#[tokio::test]
async fn csv_exec_with_reader() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let buf = std::fs::read(path).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ mod tests {

#[tokio::test]
async fn test() -> Result<()> {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
let parquet_exec = ParquetExec::try_from_path(
&filename,
Expand Down
12 changes: 6 additions & 6 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ mod tests {

#[test]
fn test_all_operators() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand Down Expand Up @@ -905,7 +905,7 @@ mod tests {

#[test]
fn test_with_csv_plan() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand All @@ -924,7 +924,7 @@ mod tests {

#[test]
fn errors() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);

Expand Down Expand Up @@ -1026,7 +1026,7 @@ mod tests {

#[test]
fn in_list_types() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
let options = CsvReadOptions::new().schema_infer_max_records(100);

Expand Down Expand Up @@ -1074,7 +1074,7 @@ mod tests {

#[test]
fn hash_agg_input_schema() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand All @@ -1097,7 +1097,7 @@ mod tests {

#[test]
fn hash_agg_group_by_partitioned() -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/aggregate_test_100.csv", testdata);

let options = CsvReadOptions::new().schema_infer_max_records(100);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {

/// Generated partitioned copy of a CSV file
pub fn create_partitioned_csv(filename: &str, partitions: usize) -> Result<String> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = crate::test_util::arrow_test_data();
let path = format!("{}/csv/{}", testdata, filename);

let tmp_dir = TempDir::new()?;
Expand Down
166 changes: 166 additions & 0 deletions datafusion/src/test_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utils to make testing easier

use std::{env, error::Error, path::PathBuf};

/// Returns the arrow test data directory, which is by default stored
/// in a git submodule rooted at `testing/data`.
///
/// The default can be overridden by the optional environment
/// variable `ARROW_TEST_DATA`
///
/// panics when the directory can not be found.
///
/// Example:
/// ```
/// let testdata = datafusion::test_util::arrow_test_data();
/// let csvdata = format!("{}/csv/aggregate_test_100.csv", testdata);
/// assert!(std::path::PathBuf::from(csvdata).exists());
/// ```
pub fn arrow_test_data() -> String {
match get_data_dir("ARROW_TEST_DATA", "../testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get arrow data dir: {}", err),
}
}

/// Returns the parquest test data directory, which is by default
/// stored in a git submodule rooted at
/// `parquest-testing/data`.
///
/// The default can be overridden by the optional environment variable
/// `PARQUET_TEST_DATA`
///
/// panics when the directory can not be found.
///
/// Example:
/// ```
/// let testdata = datafusion::test_util::parquet_test_data();
/// let filename = format!("{}/binary.parquet", testdata);
/// assert!(std::path::PathBuf::from(filename).exists());
/// ```
pub fn parquet_test_data() -> String {
match get_data_dir("PARQUET_TEST_DATA", "../parquet-testing/data") {
Ok(pb) => pb.display().to_string(),
Err(err) => panic!("failed to get parquet data dir: {}", err),
}
}

/// Returns a directory path for finding test data.
///
/// udf_env: name of an environment variable
///
/// submodule_dir: fallback path (relative to CARGO_MANIFEST_DIR)
///
/// Returns either:
/// The path referred to in `udf_env` if that variable is set and refers to a directory
/// The submodule_data directory relative to CARGO_MANIFEST_PATH
fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result<PathBuf, Box<dyn Error>> {
// Try user defined env.
if let Ok(dir) = env::var(udf_env) {
let trimmed = dir.trim().to_string();
if !trimmed.is_empty() {
let pb = PathBuf::from(trimmed);
if pb.is_dir() {
return Ok(pb);
} else {
return Err(format!(
"the data dir `{}` defined by env {} not found",
pb.display().to_string(),
udf_env
)
.into());
}
}
}

// The env is undefined or its value is trimmed to empty, let's try default dir.

// env "CARGO_MANIFEST_DIR" is "the directory containing the manifest of your package",
// set by `cargo run` or `cargo test`, see:
// https://doc.rust-lang.org/cargo/reference/environment-variables.html
let dir = env!("CARGO_MANIFEST_DIR");

let pb = PathBuf::from(dir).join(submodule_data);
if pb.is_dir() {
Ok(pb)
} else {
Err(format!(
"env `{}` is undefined or has empty value, and the pre-defined data dir `{}` not found\n\
HINT: try running `git submodule update --init`",
udf_env,
pb.display().to_string(),
).into())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::env;

#[test]
fn test_data_dir() {
let udf_env = "get_data_dir";
let cwd = env::current_dir().unwrap();

let existing_pb = cwd.join("..");
let existing = existing_pb.display().to_string();
let existing_str = existing.as_str();

let non_existing = cwd.join("non-existing-dir").display().to_string();
let non_existing_str = non_existing.as_str();

env::set_var(udf_env, non_existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_err());

env::set_var(udf_env, "");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, " ");
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, existing_str);
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::remove_var(udf_env);
let res = get_data_dir(udf_env, non_existing_str);
assert!(res.is_err());

let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);
}

#[test]
fn test_happy() {
let res = arrow_test_data();
assert!(PathBuf::from(res).is_dir());

let res = parquet_test_data();
assert!(PathBuf::from(res).is_dir());
}
}
14 changes: 7 additions & 7 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn parquet_query() {
#[tokio::test]
async fn parquet_single_nan_schema() {
let mut ctx = ExecutionContext::new();
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
.unwrap();
let sql = "SELECT mycol FROM single_nan";
Expand All @@ -144,7 +144,7 @@ async fn parquet_single_nan_schema() {
#[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"]
async fn parquet_list_columns() {
let mut ctx = ExecutionContext::new();
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"list_columns",
&format!("{}/list_columns.parquet", testdata),
Expand Down Expand Up @@ -2009,7 +2009,7 @@ fn aggr_test_schema() -> SchemaRef {
}

async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();

// TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
// unsigned is supported.
Expand Down Expand Up @@ -2049,7 +2049,7 @@ async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
}

fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
let testdata = arrow::util::test_util::arrow_test_data();
let testdata = datafusion::test_util::arrow_test_data();
let schema = aggr_test_schema();
ctx.register_csv(
"aggregate_test_100",
Expand All @@ -2076,7 +2076,7 @@ fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> {
}

fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
let testdata = arrow::util::test_util::parquet_test_data();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
Expand Down Expand Up @@ -3374,7 +3374,7 @@ async fn test_physical_plan_display_indent() {
" CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
];

let data_path = arrow::util::test_util::arrow_test_data();
let data_path = datafusion::test_util::arrow_test_data();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
Expand Down Expand Up @@ -3423,7 +3423,7 @@ async fn test_physical_plan_display_indent_multi_children() {
" CsvExec: source=Path(ARROW_TEST_DATA/csv/aggregate_test_100.csv: [ARROW_TEST_DATA/csv/aggregate_test_100.csv]), has_header=true",
];

let data_path = arrow::util::test_util::arrow_test_data();
let data_path = datafusion::test_util::arrow_test_data();
let actual = format!("{}", displayable(physical_plan.as_ref()).indent())
.trim()
.lines()
Expand Down