Skip to content

Commit

Permalink
feat(read_ file funcs): infer from compressed formats (#2639)
Browse files Browse the repository at this point in the history
Co-authored-by: universalmind303 <universalmind.candy@gmail.com>
  • Loading branch information
hemanth94 and universalmind303 authored Feb 21, 2024
1 parent 5e5875c commit ef464f1
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 7 deletions.
25 changes: 18 additions & 7 deletions crates/datafusion_ext/src/planner/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::collections::HashMap;
use std::path::Path;

use async_recursion::async_recursion;
use datafusion::common::{DataFusionError, OwnedTableReference, Result};
use datafusion::common::{DataFusionError, GetExt, OwnedTableReference, Result};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion::scalar::ScalarValue;
use datafusion::sql::planner::PlannerContext;
Expand Down Expand Up @@ -247,15 +248,19 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
Ok(match ext.as_str() {
"parquet" => OwnedTableReference::Partial {
schema: "public".into(),
table: "parquet_scan".into(),
table: "read_parquet".into(),
},
"xlsx" => OwnedTableReference::Partial {
schema: "public".into(),
table: "read_excel".into(),
},
"csv" => OwnedTableReference::Partial {
schema: "public".into(),
table: "csv_scan".into(),
table: "read_csv".into(),
},
"json" | "jsonl" | "ndjson" => OwnedTableReference::Partial {
schema: "public".into(),
table: "ndjson_scan".into(),
table: "read_ndjson".into(),
},
"bson" => OwnedTableReference::Partial {
schema: "public".into(),
Expand All @@ -266,9 +271,15 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
table: "read_excel".into(),
},
ext => {
return Err(DataFusionError::Plan(format!(
"unable to infer how to handle file extension: {ext}"
)))
if let Ok(compression_type) = ext.parse::<FileCompressionType>() {
let ext = compression_type.get_ext();
let path = path.trim_end_matches(ext.as_str());
infer_func_for_file(path)?
} else {
return Err(DataFusionError::Plan(format!(
"unable to infer how to handle file extension: {ext}"
)));
}
}
})
}
Binary file added testdata/csv/userdata1.csv.bz2
Binary file not shown.
Binary file added testdata/csv/userdata1.csv.gz
Binary file not shown.
Binary file added testdata/csv/userdata1.csv.xz
Binary file not shown.
Binary file added testdata/csv/userdata1.csv.zst
Binary file not shown.
Binary file added testdata/json/userdata1.json.bz2
Binary file not shown.
Binary file added testdata/json/userdata1.json.gz
Binary file not shown.
Binary file added testdata/json/userdata1.json.xz
Binary file not shown.
Binary file added testdata/json/userdata1.json.zst
Binary file not shown.
Binary file added testdata/parquet/userdata1.parquet.bz2
Binary file not shown.
Binary file added testdata/parquet/userdata1.parquet.gz
Binary file not shown.
Binary file added testdata/parquet/userdata1.parquet.xz
Binary file not shown.
Binary file added testdata/parquet/userdata1.parquet.zst
Binary file not shown.
182 changes: 182 additions & 0 deletions testdata/sqllogictests/infer.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,185 @@ select count(*) from './testdata/parquet/*.parquet'
statement error missing file extension
select count(*) from './testdata/parquet/*'




#Tests for inferring table functions from compressed file formats

#Tests for CSV with .gz, .bz2, .xz, .zst
#csv.gz
query
select count(*) from './testdata/csv/userdata1.csv.gz'
----
1000

#csv.gz
query IT
select id, "./testdata/csv/userdata1.csv.gz".first_name
from './testdata/csv/userdata1.csv.gz'
order by id
limit 1
----
1 Amanda

#csv.bz2
query
select count(*) from './testdata/csv/userdata1.csv.bz2'
----
1000

#csv.bz2
query IT
select id, "./testdata/csv/userdata1.csv.bz2".first_name
from './testdata/csv/userdata1.csv.bz2'
order by id
limit 1
----
1 Amanda

#csv.xz
query
select count(*) from './testdata/csv/userdata1.csv.xz'
----
1000

#csv.xz
query IT
select id, "./testdata/csv/userdata1.csv.xz".first_name
from './testdata/csv/userdata1.csv.xz'
order by id
limit 1
----
1 Amanda

#csv.zst
query
select count(*) from './testdata/csv/userdata1.csv.zst'
----
1000

#csv.zst
query IT
select id, "./testdata/csv/userdata1.csv.zst".first_name
from './testdata/csv/userdata1.csv.zst'
order by id
limit 1
----
1 Amanda



#Tests for json with .gz, .bz2, .xz, .zst
#json.gz
query
select count(*) from './testdata/json/userdata1.json.gz'
----
1000

#json.gz
query IT
select id, "./testdata/json/userdata1.json.gz".first_name
from './testdata/json/userdata1.json.gz'
order by id
limit 1
----
1 Amanda

#json.bz2
query
select count(*) from './testdata/json/userdata1.json.bz2'
----
1000

#json.bz2
query IT
select id, "./testdata/json/userdata1.json.bz2".first_name
from './testdata/json/userdata1.json.bz2'
order by id
limit 1
----
1 Amanda

#json.xz
query
select count(*) from './testdata/json/userdata1.json.xz'
----
1000

#json.xz
query IT
select id, "./testdata/json/userdata1.json.xz".first_name
from './testdata/json/userdata1.json.xz'
order by id
limit 1
----
1 Amanda

#json.zst
query
select count(*) from './testdata/json/userdata1.json.zst'
----
1000

#json.zst
query IT
select id, "./testdata/json/userdata1.json.zst".first_name
from './testdata/json/userdata1.json.zst'
order by id
limit 1
----
1 Amanda



#For infering function from parquet compressed formats .bz2, .xz, .zst, .gz
#parquet.bz2
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.bz2'


#parquet.bz2
statement error compression not supported for parquet
select id, "./testdata/parquet/userdata1.parquet.bz2".first_name
from './testdata/parquet/userdata1.parquet.bz2'
order by id
limit 1


#parquet.xz
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.xz'


#parquet.xz
statement error compression not supported for parquet
select id, "./testdata/parquet/userdata1.parquet.xz".first_name
from './testdata/parquet/userdata1.parquet.xz'
order by id
limit 1


#parquet.zst
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.zst'


#parquet.zst
statement error compression not supported for parquet
select id, "./testdata/parquet/userdata1.parquet.zst".first_name
from './testdata/parquet/userdata1.parquet.zst'
order by id
limit 1


#parquet.gz
statement error compression not supported for parquet
select count(*) from './testdata/parquet/userdata1.parquet.gz'


#parquet.gz
statement error compression not supported for parquet
select id, "./testdata/parquet/userdata1.parquet.gz".first_name
from './testdata/parquet/userdata1.parquet.gz'
order by id
limit 1

0 comments on commit ef464f1

Please sign in to comment.