Skip to content

Commit

Permalink
feat: Allow specifying alt delimiter when reading csv (#2365)
Browse files Browse the repository at this point in the history
```
> select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';');
┌───────┬──────────────┬─────────┐
│  col1 │ col2         │    col3 │
│    ── │ ──           │      ── │
│ Int64 │ Utf8         │ Float64 │
╞═══════╪══════════════╪═════════╡
│     1 │ hello, world │ 3.90000 │
│     2 │ HELLO, WORLD │ 4.90000 │
└───────┴──────────────┴─────────┘
```

Depends on #2364

Adds the plumbing for getting options from the user in our various scan
functions. The only one wired up right now is 'delimiter' for csv, but
other options should be very straightforward to add in now.
  • Loading branch information
scsmithr authored Jan 7, 2024
1 parent c21a473 commit 57c8d8b
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 37 deletions.
6 changes: 3 additions & 3 deletions crates/datafusion_ext/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ pub enum ExtensionError {
#[error("Expected argument at index {index}: {what}")]
ExpectedIndexedArgument { index: usize, what: String },

#[error("{0}")]
String(String),

#[error("Unable to find {obj_typ}: '{name}'")]
MissingObject { obj_typ: &'static str, name: String },

Expand Down Expand Up @@ -44,6 +41,9 @@ pub enum ExtensionError {

#[error("object store: {0}")]
ObjectStore(String),

#[error("{0}")]
String(String),
}

impl ExtensionError {
Expand Down
143 changes: 109 additions & 34 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::{sync::Arc, vec};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::FileType;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
Expand Down Expand Up @@ -31,37 +31,119 @@ use protogen::metastore::types::options::{CredentialsOptions, StorageOptions};
use super::TableFunc;
use crate::functions::BuiltinFunction;

pub const READ_PARQUET: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::PARQUET,
#[derive(Debug, Clone, Copy)]
pub struct ParquetOptionsReader;

impl OptionReader for ParquetOptionsReader {
type Format = ParquetFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(ParquetFormat::default())
}
}

pub const READ_PARQUET: ObjScanTableFunc<ParquetOptionsReader> = ObjScanTableFunc {
name: "read_parquet",
aliases: &["parquet_scan"],
phantom: PhantomData,
};

pub const READ_CSV: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::CSV,
#[derive(Debug, Clone, Copy)]
pub struct CsvOptionReader;

impl OptionReader for CsvOptionReader {
type Format = CsvFormat;

fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480));

if let Some(delimiter) = opts.get("delimiter") {
let delimiter: String = delimiter.clone().try_into()?;
let bs = delimiter.as_bytes();
if bs.len() != 1 {
return Err(ExtensionError::String(
"delimiters for CSV must fit in one byte (e.g. ',')".to_string(),
));
}
let delimiter = bs[0];
format = format.with_delimiter(delimiter);
}

Ok(format)
}
}

pub const READ_CSV: ObjScanTableFunc<CsvOptionReader> = ObjScanTableFunc {
name: "read_csv",
aliases: &["csv_scan"],
phantom: PhantomData,
};

pub const READ_JSON: ObjScanTableFunc = ObjScanTableFunc {
file_type: FileType::JSON,
#[derive(Debug, Clone, Copy)]
pub struct JsonOptionsReader;

impl OptionReader for JsonOptionsReader {
type Format = JsonFormat;

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(JsonFormat::default())
}
}

pub const READ_JSON: ObjScanTableFunc<JsonOptionsReader> = ObjScanTableFunc {
name: "read_ndjson",
aliases: &["ndjson_scan"],
phantom: PhantomData,
};

pub trait OptionReader: Sync + Send + Sized {
type Format: FileFormat + WithCompression + 'static;

/// Read user provided options, and construct a file format usign those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}

/// Helper trait for adding the compression option to file formats.
pub trait WithCompression: Sized {
fn with_compression(self, compression: FileCompressionType) -> Result<Self>;
}

impl WithCompression for CsvFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(CsvFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for JsonFormat {
fn with_compression(self, compression: FileCompressionType) -> Result<Self> {
Ok(JsonFormat::with_file_compression_type(self, compression))
}
}

impl WithCompression for ParquetFormat {
fn with_compression(self, _compression: FileCompressionType) -> Result<Self> {
// TODO: Snappy is a common compression algo to use parquet. If we want
// to support it, we'd need to extend the file compression enum with our
// own version.
Err(ExtensionError::String(
"compression not supported for parquet".to_string(),
))
}
}

/// Generic file scan for different file types.
#[derive(Debug, Clone)]
pub struct ObjScanTableFunc {
file_type: FileType,

pub struct ObjScanTableFunc<Opts> {
/// Primary name for the function.
name: &'static str,

/// Additional aliases for this function.
aliases: &'static [&'static str],

phantom: PhantomData<Opts>,
}

impl BuiltinFunction for ObjScanTableFunc {
impl<Opts: OptionReader> BuiltinFunction for ObjScanTableFunc<Opts> {
fn name(&self) -> &'static str {
self.name
}
Expand Down Expand Up @@ -97,7 +179,7 @@ impl BuiltinFunction for ObjScanTableFunc {
}

#[async_trait]
impl TableFunc for ObjScanTableFunc {
impl<Opts: OptionReader> TableFunc for ObjScanTableFunc<Opts> {
fn detect_runtime(
&self,
args: &[FuncParamValue],
Expand Down Expand Up @@ -155,10 +237,15 @@ impl TableFunc for ObjScanTableFunc {
));
}

let file_compression = match opts.remove("compression") {
// Read in user provided options and use them to construct the format.
let mut format = Opts::read_options(&opts)?;

// Read in compression is provided by the user, or try to infer it from
// the file extension.
let compression: Option<FileCompressionType> = match opts.remove("compression") {
Some(cmp) => {
let cmp: String = cmp.try_into()?;
cmp.parse::<FileCompressionType>()?
Some(cmp.parse::<FileCompressionType>()?)
}
None => {
let path = urls
Expand All @@ -168,26 +255,12 @@ impl TableFunc for ObjScanTableFunc {
let path = std::path::Path::new(path.as_ref());
path.extension()
.and_then(|ext| ext.to_string_lossy().as_ref().parse().ok())
.unwrap_or(FileCompressionType::UNCOMPRESSED)
}
};

let ft: Arc<dyn FileFormat> = match &self.file_type {
FileType::CSV => Arc::new(
CsvFormat::default()
.with_file_compression_type(file_compression)
.with_schema_infer_max_rec(Some(20480)),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(file_compression))
}
ft => {
return Err(ExtensionError::String(format!(
"Unsuppored file type: {ft:?}"
)))
}
};
if let Some(compression) = compression {
format = format.with_compression(compression)?;
}

// Optimize creating a table provider for objects by clubbing the same
// store together.
Expand All @@ -211,18 +284,20 @@ impl TableFunc for ObjScanTableFunc {
// Now that we have all urls (grouped by their access), we try and get
// all the objects and turn this into a table provider.

let o = fn_registry
let format: Arc<dyn FileFormat> = Arc::new(format);
let table = fn_registry
.into_values()
.map(|(access, locations)| {
let provider = get_table_provider(ctx, ft.clone(), access, locations.into_iter());
let provider =
get_table_provider(ctx, format.clone(), access, locations.into_iter());
provider
})
.collect::<futures::stream::FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await
.map(|providers| Arc::new(MultiSourceTableProvider::new(providers)) as _)?;

Ok(o)
Ok(table)
}
}

Expand Down
3 changes: 3 additions & 0 deletions testdata/csv/delimiter.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1;col2;col3
1;hello, world;3.9
2;HELLO, WORLD;4.9
12 changes: 12 additions & 0 deletions testdata/sqllogictests/functions/read_csv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,15 @@ statement error Note that globbing is not supported for HTTP.
select * from read_csv(
'https://raw.githubusercontent.com/GlareDB/glaredb/main/testdata/sqllogictests_datasources_common/data/*.csv'
);

# Alternative delimiters

query ITR
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';');
----
1 hello, world 3.9
2 HELLO, WORLD 4.9

# Invalid delimiter (longer than one byte)
statement error delimiters for CSV must fit in one byte \(e.g. ','\)
select * from read_csv('../../testdata/csv/delimiter.csv', delimiter => ';;');

0 comments on commit 57c8d8b

Please sign in to comment.