Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(read_ file funcs): infer from compressed formats #2639

Merged
merged 26 commits into from
Feb 21, 2024
Merged
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion crates/datafusion_ext/src/planner/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::collections::HashMap;
use std::path::Path;
use std::path::{Path, PathBuf};

use async_recursion::async_recursion;
use datafusion::common::{DataFusionError, OwnedTableReference, Result};
Expand Down Expand Up @@ -242,6 +242,13 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
.ok_or_else(|| DataFusionError::Plan(format!("strange file extension: {path}")))?
.to_lowercase();

let binding = PathBuf::from(path);
let filename = binding
.file_name()
.ok_or_else(|| DataFusionError::Plan(format!("NO file name provided: {path}")))?
.to_str()
.ok_or_else(|| DataFusionError::Plan(format!("Improper file name: {path}")))?;
tychoish marked this conversation as resolved.
Show resolved Hide resolved
hemanth94 marked this conversation as resolved.
Show resolved Hide resolved

// TODO: We can be a bit more sophisticated here and handle compression
// schemes as well.
Ok(match ext.as_str() {
Expand All @@ -261,10 +268,45 @@ fn infer_func_for_file(path: &str) -> Result<OwnedTableReference> {
schema: "public".into(),
table: "read_bson".into(),
},
"gz" => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd want to match on all compression types here.

We can use datafusion's FileCompressionType for this.

if let Ok(compression_type) = ext.parse::<FileCompressionType>() {
  let ext = compression_type.get_ext():
  let path = path.trim_end_matches(ext);
  // then just recursively call the function with the extension removed
  infer_func_for_file(path)
} else {
  match ext.as_str() { /* ... */ }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use this to include all compression types.

I overlooked this code logic you suggested while making commits, Will use this as well properly.

//handling compressed files with .gz extension
infer_func_from_compressed_file(filename)?
}
ext => {
return Err(DataFusionError::Plan(format!(
"unable to infer how to handle file extension: {ext}"
)))
}
})
}

fn infer_func_from_compressed_file(filename: &str) -> Result<OwnedTableReference> {
tychoish marked this conversation as resolved.
Show resolved Hide resolved
if filename.contains(".json.gz")
| filename.contains(".json1.gz")
| filename.contains(".ndjson.gz")
tychoish marked this conversation as resolved.
Show resolved Hide resolved
{
return Ok(OwnedTableReference::Partial {
schema: "public".into(),
table: "ndjson_scan".into(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the read_ functions here?

});
} else if filename.contains(".parquet.gz") {
return Ok(OwnedTableReference::Partial {
schema: "public".into(),
table: "parquet_scan".into(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to gzip a parquet file given its internal compression, and the fact that gzip will make it difficult/impossible to partition anything from the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think if arrow's parquet reader supports gz compression, then we should too.

If it's wise to gzip parquet is another question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, I don't think this is a very strong opinion, but I think this is likely to lead people to having very poor experiences, and given that, maybe it'd be best to disallow it?

There's no reason we need to recapitulate arrow or datafuison's mistakes.

Copy link
Contributor

@universalmind303 universalmind303 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then if people have gzipped parquet files already created from <insert tool>, it's unreadable. I don't think we should put this limitation on the readers. I could see this being a valid argument for writing though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then if people have gzipped parquet files already created from , it's unreadable. I don't think we should put this limitation on the readers.

I don't think we have a write path that compresses any objects, anyway.

I'm pretty sure that there aren't tools that support this as much as people using gzip from the shell, but that doesn't make it reasonable or correct.

Should we support use-cases which are fundamentally misunderstandings or mistakes, especially when likely to produce bad experiences just because someone might stumble into them?

It's much easier to add something like this later if there's a valid, non-pathological use-case, than it is to spend [the lifetime of the product] doing damage control. Sometimes errors in these cases are more kind than supporting something that's broken but possible.

} else if filename.contains(".csv.gz") {
return Ok(OwnedTableReference::Partial {
schema: "public".into(),
table: "csv_scan".into(),
});
} else if filename.contains(".bson.gz") {
return Ok(OwnedTableReference::Partial {
schema: "public".into(),
table: "read_bson".into(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure that this will actually work.

Copy link
Contributor

@universalmind303 universalmind303 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still pass this in to the planner even if it's invalid. I don't think this stage of the planning should be concerned with plan validity. It's only concerned with creating the logical plan from a semantically correct query, even if that plan itself is invalid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that makes a lot of sense, or at least that's not consistent with the implementation. If we want to implement what you describe than we split the right side of the string on . and match the penultimate phrase against the formats, and the ultimate segment on the compression formats. Really, If it's not this code's responsibility to decide if the reading function can handle the compression formats, then we should actually ignore them here.

But I don't think that's what we actually want, and I don't think "pass invalid arguments through and hope that the downstream code handles it correctly," is a reasonable design and also (more importantly) it makes it harder to return a clear error message to users in invalid cases.

Copy link
Contributor

@universalmind303 universalmind303 Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not "hoping downstream code handles it correctly". It's a matter of separation of logic and keeping things DRY. The table function already need to handle this scenario. It's redundant to check it here also

select * from read_bson('some_file.bson.gz'); # table function already needs to check it
select * from 'some_file.bson.gz'; # just expands to above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a matter of separation of logic and keeping things DRY.

My concern is primarily for centralizing the encoding of the validation logic for all of the formats and table functions (and DDL-created dispatches to these table providers), so that we can have clear error messages, and avoid having duplicate validation logic.

The table function already need to handle this scenario.

Sort of? The table functions are going to error if they can't read the data or if it's in the wrong format. It's maybe not necessary for them to validate that the path itself is well-formed.

It's redundant to check it here also.

It is redundant, but so is enumerating every possible compression algorithm and extension variant. We shouldn't need to care about .bson.gz vs .bson.gzip (and so forth). I think the reasonable solution to getting something here that's helpful and useful is:

  • split the string by . if the last or second-to-last element in the resulting sequence is bson, ndjson, jsonl, or csv then dispatch to the appropriate function.
  • let's leave .json out of this as much as possible because of ambiguity between text sequences and line-separated json.
  • write tests to validate that the table functions behave correctly with compression and propagate reasonable error messages otherwise.

} else {
return Err(DataFusionError::Plan(format!(
"Improper compressed filename with extension .gz : {filename}"
)));
}
}
Loading