Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow specifying alt delimiter when reading csv #2365

Merged
merged 1 commit into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/datafusion_ext/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ pub enum ExtensionError {
#[error("Expected argument at index {index}: {what}")]
ExpectedIndexedArgument { index: usize, what: String },

#[error("{0}")]
String(String),

#[error("Unable to find {obj_typ}: '{name}'")]
MissingObject { obj_typ: &'static str, name: String },

Expand Down Expand Up @@ -44,6 +41,9 @@ pub enum ExtensionError {

#[error("object store: {0}")]
ObjectStore(String),

#[error("{0}")]
String(String),
}

impl ExtensionError {
Expand Down
143 changes: 109 additions & 34 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::{sync::Arc, vec};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::FileType;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
Expand Down Expand Up @@ -31,37 +31,119 @@ use protogen::metastore::types::options::{CredentialsOptions, StorageOptions};
use super::TableFunc;
use crate::functions::BuiltinFunction;

pub const READ_PARQUET: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::PARQUET,
#[derive(Debug, Clone, Copy)]
pub struct ParquetOptionsReader;

impl OptionReader for ParquetOptionsReader {
type Format = ParquetFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(ParquetFormat::default())
}
}

pub const READ_PARQUET: ObjScanTableFunc<ParquetOptionsReader> = ObjScanTableFunc {
name: "read_parquet",
aliases: &["parquet_scan"],
phantom: PhantomData,
};

pub const READ_CSV: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::CSV,
#[derive(Debug, Clone, Copy)]
pub struct CsvOptionReader;

impl OptionReader for CsvOptionReader {
type Format = CsvFormat;

fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480));

if let Some(delimiter) = opts.get("delimiter") {
let delimiter: String = delimiter.clone().try_into()?;
let bs = delimiter.as_bytes();
if bs.len() != 1 {
return Err(ExtensionError::String(
"delimiters for CSV must fit in one byte (e.g. ',')".to_string(),
));
}
let delimiter = bs[0];
format = format.with_delimiter(delimiter);
}

Ok(format)
}
}

pub const READ_CSV: ObjScanTableFunc<CsvOptionReader> = ObjScanTableFunc {
name: "read_csv",
aliases: &["csv_scan"],
phantom: PhantomData,
};

pub const READ_JSON: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::JSON,
#[derive(Debug, Clone, Copy)]
pub struct JsonOptionsReader;

impl OptionReader for JsonOptionsReader {
type Format = JsonFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(JsonFormat::default())
}
}

pub const READ_JSON: ObjScanTableFunc<JsonOptionsReader> = ObjScanTableFunc {
name: "read_ndjson",
aliases: &["ndjson_scan"],
phantom: PhantomData,
};

pub trait OptionReader: Sync + Send + Sized {
type Format: FileFormat + WithCompression + 'static;

/// Read user provided options, and construct a file format usign those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}

/// Helper trait for adding the compression option to file formats.
pub trait WithCompression: Sized {
fn with_compression(self, compression: FileCompressionType) -> Result<Self>;
}

impl WithCompression for CsvFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(CsvFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for JsonFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(JsonFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for ParquetFormat {
fn with_compression(self, _compression: FileCompressionType) -> Result<Self> {
// TODO: Snappy is a common compression algo to use parquet. If we want
// to support it, we'd need to extend the file compression enum with our
// own version.
Err(ExtensionError::String(
"compression not supported for parquet".to_string(),
))
}
}

/// Generic file scan for different file types.
#[derive(Debug, Clone)]
pub struct ObjScanTableFunc {
file_type: FileType,

pub struct ObjScanTableFunc<Opts> {
/// Primary name for the function.
name: &'static str,

/// Additional aliases for this function.
aliases: &'static [&'static str],

phantom: PhantomData<Opts>,
}

impl BuiltinFunction for ObjScanTableFunc {
impl<Opts: OptionReader> BuiltinFunction for ObjScanTableFunc<Opts> {
fn name(&self) -> &'static str {
self.name
}
Expand Down Expand Up @@ -97,7 +179,7 @@ impl BuiltinFunction for ObjScanTableFunc {
}

#[async_trait]
impl TableFunc for ObjScanTableFunc {
impl<Opts: OptionReader> TableFunc for ObjScanTableFunc<Opts> {
fn detect_runtime(
&self,
args: &[FuncParamValue],
Expand Down Expand Up @@ -155,10 +237,15 @@ impl TableFunc for ObjScanTableFunc {
));
}

let file_compression = match opts.remove("compression") {
// Read in user provided options and use them to construct the format.
let mut format = Opts::read_options(&opts)?;

// Read in compression is provided by the user, or try to infer it from
// the file extension.
let compression: Option<FileCompressionType> = match opts.remove("compression") {
Some(cmp) => {
let cmp: String = cmp.try_into()?;
cmp.parse::<FileCompressionType>()?
Some(cmp.parse::<FileCompressionType>()?)
}
None => {
let path = urls
Expand All @@ -168,26 +255,12 @@ impl TableFunc for ObjScanTableFunc {
let path = std::path::Path::new(path.as_ref());
path.extension()
.and_then(|ext| ext.to_string_lossy().as_ref().parse().ok())
.unwrap_or(FileCompressionType::UNCOMPRESSED)
}
};

let ft: Arc<dyn FileFormat> = match &self.file_type {
FileType::CSV => Arc::new(
CsvFormat::default()
.with_file_compression_type(file_compression)
.with_schema_infer_max_rec(Some(20480)),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(file_compression))
}
ft => {
return Err(ExtensionError::String(format!(
"Unsuppored file type: {ft:?}"
)))
}
};
if let Some(compression) = compression {
format = format.with_compression(compression)?;
}

// Optimize creating a table provider for objects by clubbing the same
// store together.
Expand All @@ -211,18 +284,20 @@ impl TableFunc for ObjScanTableFunc {
// Now that we have all urls (grouped by their access), we try and get
// all the objects and turn this into a table provider.

let o = fn_registry
let format: Arc<dyn FileFormat> = Arc::new(format);
let table = fn_registry
.into_values()
.map(|(access, locations)| {
let provider = get_table_provider(ctx, ft.clone(), access, locations.into_iter());
let provider =
get_table_provider(ctx, format.clone(), access, locations.into_iter());
provider
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await
.map(|providers| Arc::new(MultiSourceTableProvider::new(providers)) as _)?;

Ok(o)
Ok(table)
}
}

Expand Down
3 changes: 3 additions & 0 deletions testdata/csv/delimiter.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1;col2;col3
1;hello, world;3.9
2;HELLO, WORLD;4.9
12 changes: 12 additions & 0 deletions testdata/sqllogictests/functions/read_csv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ statement error Note that globbing is not supported for HTTP.
select * from read_csv(
'https://raw.githubusercontent.com/GlareDB/glaredb/main/testdata/sqllogictests_datasources_common/data/*.csv'
);

# Alternative delimiters

query ITR
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';');
----
1 hello, world 3.9
2 HELLO, WORLD 4.9

# Invalid delimiter (longer than one byte)
statement error delimiters for CSV must fit in one byte \(e.g. ','\)
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';;');
Loading