Skip to content

Commit

Permalink
feat: Allow specifying alt delimiter when reading csv
Browse files Browse the repository at this point in the history
  • Loading branch information
scsmithr committed Jan 7, 2024
1 parent c21a473 commit 58e303a
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 37 deletions.
6 changes: 3 additions & 3 deletions crates/datafusion_ext/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ pub enum ExtensionError {
#[error("Expected argument at index {index}: {what}")]
ExpectedIndexedArgument { index: usize, what: String },

#[error("{0}")]
String(String),

#[error("Unable to find {obj_typ}: '{name}'")]
MissingObject { obj_typ: &'static str, name: String },

Expand Down Expand Up @@ -44,6 +41,9 @@ pub enum ExtensionError {

#[error("object store: {0}")]
ObjectStore(String),

#[error("{0}")]
String(String),
}

impl ExtensionError {
Expand Down
143 changes: 109 additions & 34 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::{sync::Arc, vec};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::FileType;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
Expand Down Expand Up @@ -31,37 +31,119 @@ use protogen::metastore::types::options::{CredentialsOptions, StorageOptions};
use super::TableFunc;
use crate::functions::BuiltinFunction;

pub const READ_PARQUET: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::PARQUET,
#[derive(Debug, Clone, Copy)]
pub struct ParquetOptionsReader;

impl OptionReader for ParquetOptionsReader {
type Format = ParquetFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(ParquetFormat::default())
}
}

pub const READ_PARQUET: ObjScanTableFunc<ParquetOptionsReader> = ObjScanTableFunc {
name: "read_parquet",
aliases: &["parquet_scan"],
phantom: PhantomData,
};

pub const READ_CSV: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::CSV,
#[derive(Debug, Clone, Copy)]
pub struct CsvOptionReader;

impl OptionReader for CsvOptionReader {
type Format = CsvFormat;

fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480));

if let Some(delimiter) = opts.get("delimiter") {
let delimiter: String = delimiter.clone().try_into()?;
let bs = delimiter.as_bytes();
if bs.len() != 1 {
return Err(ExtensionError::String(
"delimiters for CSV must fit in one byte (e.g. ',')".to_string(),
));
}
let delimiter = bs[0];
format = format.with_delimiter(delimiter);
}

Ok(format)
}
}

pub const READ_CSV: ObjScanTableFunc<CsvOptionReader> = ObjScanTableFunc {
name: "read_csv",
aliases: &["csv_scan"],
phantom: PhantomData,
};

pub const READ_JSON: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::JSON,
#[derive(Debug, Clone, Copy)]
pub struct JsonOptionsReader;

impl OptionReader for JsonOptionsReader {
type Format = JsonFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(JsonFormat::default())
}
}

pub const READ_JSON: ObjScanTableFunc<JsonOptionsReader> = ObjScanTableFunc {
name: "read_ndjson",
aliases: &["ndjson_scan"],
phantom: PhantomData,
};

pub trait OptionReader: Sync + Send + Sized {
type Format: FileFormat + WithCompression + 'static;

/// Read user provided options, and construct a file format usign those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}

/// Helper trait for adding the compression option to file formats.
pub trait WithCompression: Sized {
fn with_compression(self, compression: FileCompressionType) -> Result<Self>;
}

impl WithCompression for CsvFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(CsvFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for JsonFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(JsonFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for ParquetFormat {
fn with_compression(self, _compression: FileCompressionType) -> Result<Self> {
// TODO: Snappy is a common compression algo to use parquet. If we want
// to support it, we'd need to extend the file compression enum with our
// own version.
Err(ExtensionError::String(
"compression not supported for parquet".to_string(),
))
}
}

/// Generic file scan for different file types.
#[derive(Debug, Clone)]
pub struct ObjScanTableFunc {
file_type: FileType,

pub struct ObjScanTableFunc<Opts> {
/// Primary name for the function.
name: &'static str,

/// Additional aliases for this function.
aliases: &'static [&'static str],

phantom: PhantomData<Opts>,
}

impl BuiltinFunction for ObjScanTableFunc {
impl<Opts: OptionReader> BuiltinFunction for ObjScanTableFunc<Opts> {
fn name(&self) -> &'static str {
self.name
}
Expand Down Expand Up @@ -97,7 +179,7 @@ impl BuiltinFunction for ObjScanTableFunc {
}

#[async_trait]
impl TableFunc for ObjScanTableFunc {
impl<Opts: OptionReader> TableFunc for ObjScanTableFunc<Opts> {
fn detect_runtime(
&self,
args: &[FuncParamValue],
Expand Down Expand Up @@ -155,10 +237,15 @@ impl TableFunc for ObjScanTableFunc {
));
}

let file_compression = match opts.remove("compression") {
// Read in user provided options and use them to construct the format.
let mut format = Opts::read_options(&opts)?;

// Read in compression is provided by the user, or try to infer it from
// the file extension.
let compression: Option<FileCompressionType> = match opts.remove("compression") {
Some(cmp) => {
let cmp: String = cmp.try_into()?;
cmp.parse::<FileCompressionType>()?
Some(cmp.parse::<FileCompressionType>()?)
}
None => {
let path = urls
Expand All @@ -168,26 +255,12 @@ impl TableFunc for ObjScanTableFunc {
let path = std::path::Path::new(path.as_ref());
path.extension()
.and_then(|ext| ext.to_string_lossy().as_ref().parse().ok())
.unwrap_or(FileCompressionType::UNCOMPRESSED)
}
};

let ft: Arc<dyn FileFormat> = match &self.file_type {
FileType::CSV => Arc::new(
CsvFormat::default()
.with_file_compression_type(file_compression)
.with_schema_infer_max_rec(Some(20480)),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(file_compression))
}
ft => {
return Err(ExtensionError::String(format!(
"Unsuppored file type: {ft:?}"
)))
}
};
if let Some(compression) = compression {
format = format.with_compression(compression)?;
}

// Optimize creating a table provider for objects by clubbing the same
// store together.
Expand All @@ -211,18 +284,20 @@ impl TableFunc for ObjScanTableFunc {
// Now that we have all urls (grouped by their access), we try and get
// all the objects and turn this into a table provider.

let o = fn_registry
let format: Arc<dyn FileFormat> = Arc::new(format);
let table = fn_registry
.into_values()
.map(|(access, locations)| {
let provider = get_table_provider(ctx, ft.clone(), access, locations.into_iter());
let provider =
get_table_provider(ctx, format.clone(), access, locations.into_iter());
provider
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await
.map(|providers| Arc::new(MultiSourceTableProvider::new(providers)) as _)?;

Ok(o)
Ok(table)
}
}

Expand Down
3 changes: 3 additions & 0 deletions testdata/csv/delimiter.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1;col2;col3
1;hello, world;3.9
2;HELLO, WORLD;4.9
12 changes: 12 additions & 0 deletions testdata/sqllogictests/functions/read_csv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ statement error Note that globbing is not supported for HTTP.
select * from read_csv(
'https://raw.githubusercontent.com/GlareDB/glaredb/main/testdata/sqllogictests_datasources_common/data/*.csv'
);

# Alternative delimiters

query ITR
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';');
----
1 hello, world 3.9
2 HELLO, WORLD 4.9

# Invalid delimiter (longer than one byte)
statement error delimiters for CSV must fit in one byte \(e.g. ','\)
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';;');

0 comments on commit 58e303a

Please sign in to comment.