Skip to content

Commit

Permalink
Fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed May 9, 2024
1 parent d64de7e commit 9d81601
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 136 deletions.
6 changes: 3 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,7 @@ config_namespace! {
pub struct CsvOptions {
/// Specifies whether there is a CSV header (i.e. the first line
/// consists of is column names). If not specified, uses default from
/// the `CREATE TABLE` command, if any.
/// the session state, if any.
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
Expand Down Expand Up @@ -1609,8 +1609,8 @@ impl CsvOptions {

/// Returns true if the first line is a header. If format options does not
/// specify whether there is a header, consults the configuration.
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
self.has_header.unwrap_or(config_opt.catalog.has_header)
pub fn has_header(&self) -> Option<bool> {
self.has_header
}

/// The character separating values within a row.
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use datafusion_common::config::{ConfigOptions, CsvOptions};
use datafusion_common::config::CsvOptions;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -142,8 +142,8 @@ impl CsvFormat {
}

/// True if the first line is a header.
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
self.options.has_header(config_opt)
pub fn has_header(&self) -> Option<bool> {
self.options.has_header
}

/// The character separating values within a row.
Expand Down Expand Up @@ -245,7 +245,9 @@ impl FileFormat for CsvFormat {
conf,
// If format options does not specify whether there is a header,
// we consult configuration options.
self.options.has_header(state.config_options()),
self.options
.has_header
.unwrap_or(state.config_options().catalog.has_header),
self.options.delimiter,
self.options.quote,
self.options.escape,
Expand Down Expand Up @@ -303,7 +305,10 @@ impl CsvFormat {
while let Some(chunk) = stream.next().await.transpose()? {
let format = arrow::csv::reader::Format::default()
.with_header(
self.options.has_header(state.config_options()) && first_chunk,
self.options
.has_header
.unwrap_or(state.config_options().catalog.has_header)
&& first_chunk,
)
.with_delimiter(self.options.delimiter);

Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use futures::StreamExt;

use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
Expand Down Expand Up @@ -58,11 +58,22 @@ impl TableProviderFactory for StreamTableFactory {
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
let location = cmd.location.clone();
let encoding = cmd.file_type.parse()?;
let header = if let Ok(opt) = cmd
.options
.get("format.has_header")
.map(|has_header| bool::from_str(has_header))
.transpose()
{
opt.unwrap_or(false)
} else {
return config_err!("format.has_header can either be true or false");
};

let config = StreamConfig::new_file(schema, location.into())
.with_encoding(encoding)
.with_order(cmd.order_exprs.clone())
.with_batch_size(state.config().batch_size())
.with_header(header)
.with_constraints(cmd.constraints.clone());

Ok(Arc::new(StreamTable(Arc::new(config))))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
c13 VARCHAR NOT NULL
)
STORED AS CSV
LOCATION '{testdata}/csv/aggregate_test_100.csv
LOCATION '{testdata}/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true')
"
))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
STORED AS CSV
WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true')";

let plan = ctx.state().create_logical_plan(query).await?;
Expand Down Expand Up @@ -268,7 +268,7 @@ async fn roundtrip_logical_plan_aggregation_with_pk() -> Result<()> {
STORED AS CSV
WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true')",
)
.await?;
Expand Down
105 changes: 20 additions & 85 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ pub struct CopyToStatement {
pub target: String,
/// Partition keys
pub partitioned_by: Vec<String>,
/// Indicates whether there is a header row (e.g. CSV)
pub has_header: bool,
/// File type (Parquet, NDJSON, CSV etc.)
pub stored_as: Option<String>,
/// Target specific options
pub options: Vec<(String, Value)>,
pub options: HashMap<String, String>,
}

impl fmt::Display for CopyToStatement {
Expand All @@ -129,7 +127,10 @@ impl fmt::Display for CopyToStatement {
}

if !options.is_empty() {
let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect();
let opts: Vec<_> = options
.iter()
.map(|(k, v)| format!("'{k}' '{v}'"))
.collect();
write!(f, " OPTIONS ({})", opts.join(", "))?;
}

Expand Down Expand Up @@ -386,8 +387,7 @@ impl<'a> DFParser<'a> {
stored_as: Option<String>,
target: Option<String>,
partitioned_by: Option<Vec<String>>,
has_header: Option<bool>,
options: Option<Vec<(String, Value)>>,
options: Option<HashMap<String, String>>,
}

let mut builder = Builder::default();
Expand Down Expand Up @@ -423,7 +423,7 @@ impl<'a> DFParser<'a> {
}
Keyword::OPTIONS => {
ensure_not_set(&builder.options, "OPTIONS")?;
builder.options = Some(self.parse_value_options()?);
builder.options = Some(self.parse_string_options()?);
}
_ => {
unreachable!()
Expand Down Expand Up @@ -451,9 +451,8 @@ impl<'a> DFParser<'a> {
source,
target,
partitioned_by: builder.partitioned_by.unwrap_or(vec![]),
has_header: builder.has_header.unwrap_or(false),
stored_as: builder.stored_as,
options: builder.options.unwrap_or(vec![]),
options: builder.options.unwrap_or(HashMap::new()),
}))
}

Expand Down Expand Up @@ -835,33 +834,6 @@ impl<'a> DFParser<'a> {
}
Ok(options)
}

/// Parses (key value) style options into a map of String --> [`Value`].
///
/// Unlike [`Self::parse_string_options`], this method supports
/// keywords as key names as well as multiple value types such as
/// Numbers as well as Strings.
fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>, ParserError> {
let mut options = vec![];
self.parser.expect_token(&Token::LParen)?;

loop {
let key = self.parse_option_key()?;
let value = self.parse_option_value()?;
options.push((key, value));
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
// allow a trailing comma, even though it's not in standard
break;
} else if !comma {
return self.expected(
"',' or ')' after option definition",
self.parser.peek_token(),
);
}
}
Ok(options)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -997,27 +969,6 @@ mod tests {
});
expect_parse_ok(sql, expected)?;

// positive case: it is ok for case insensitive sql stmt with has_header option tokens
let sqls = vec![
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('FORMAT.HAS_HEADER' 'TRUE')",
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.has_header' 'true')",
];
for sql in sqls {
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: "CSV".to_string(),
location: "foo.csv".into(),
table_partition_cols: vec![],
order_exprs: vec![],
if_not_exists: false,
unbounded: false,
options: HashMap::from([("format.has_header".into(), "true".into())]),
constraints: vec![],
});
expect_parse_ok(sql, expected)?;
}

// positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` tokens
let sqls = vec![
("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
Expand Down Expand Up @@ -1357,9 +1308,8 @@ mod tests {
source: object_name("foo"),
target: "bar".to_string(),
partitioned_by: vec![],
has_header: false,
stored_as: Some("CSV".to_owned()),
options: vec![],
options: HashMap::new(),
});

assert_eq!(verified_stmt(sql), expected);
Expand Down Expand Up @@ -1393,9 +1343,8 @@ mod tests {
source: object_name("foo"),
target: "bar".to_string(),
partitioned_by: vec![],
has_header: false,
stored_as: Some("PARQUET".to_owned()),
options: vec![],
options: HashMap::new(),
});
let expected = Statement::Explain(ExplainStatement {
analyze,
Expand Down Expand Up @@ -1430,9 +1379,8 @@ mod tests {
source: CopyToSource::Query(query),
target: "bar".to_string(),
partitioned_by: vec![],
has_header: true,
stored_as: Some("CSV".to_owned()),
options: vec![],
options: HashMap::from([("format.has_header".into(), "true".into())]),
});
assert_eq!(verified_stmt(sql), expected);
Ok(())
Expand All @@ -1445,30 +1393,22 @@ mod tests {
source: object_name("foo"),
target: "bar".to_string(),
partitioned_by: vec![],
has_header: false,
stored_as: Some("CSV".to_owned()),
options: vec![(
"row_group_size".to_string(),
Value::Number("55".to_string(), false),
)],
options: HashMap::from([("row_group_size".into(), "55".into())]),
});
assert_eq!(verified_stmt(sql), expected);
Ok(())
}

#[test]
fn copy_to_partitioned_by() -> Result<(), ParserError> {
let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS (row_group_size 55)";
let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS ('row_group_size' '55')";
let expected = Statement::CopyTo(CopyToStatement {
source: object_name("foo"),
target: "bar".to_string(),
partitioned_by: vec!["a".to_string()],
has_header: false,
stored_as: Some("CSV".to_owned()),
options: vec![(
"row_group_size".to_string(),
Value::Number("55".to_string(), false),
)],
options: HashMap::from([("row_group_size".to_string(), "55".into())]),
});
assert_eq!(verified_stmt(sql), expected);
Ok(())
Expand All @@ -1478,18 +1418,12 @@ mod tests {
fn copy_to_multi_options() -> Result<(), ParserError> {
// order of options is preserved
let sql =
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)";
"COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' '55', 'format.compression' 'snappy')";

let expected_options = vec![
(
"format.row_group_size".to_string(),
Value::Number("55".to_string(), false),
),
(
"format.compression".to_string(),
Value::UnQuotedString("snappy".to_string()),
),
];
let expected_options = HashMap::from([
("format.row_group_size".to_string(), "55".into()),
("format.compression".to_string(), "snappy".into()),
]);

let mut statements = DFParser::parse_sql(sql).unwrap();
assert_eq!(statements.len(), 1);
Expand Down Expand Up @@ -1527,6 +1461,7 @@ mod tests {
/// `canonical` sql string
fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement {
let mut statements = DFParser::parse_sql(sql).unwrap();
println!("{:?}", statements[0]);
assert_eq!(statements.len(), 1);

if sql != canonical {
Expand Down
31 changes: 1 addition & 30 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,36 +849,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
};

let mut options = HashMap::new();
for (key, value) in statement.options {
let value_string = match value {
Value::SingleQuotedString(s) => s.to_string(),
Value::DollarQuotedString(s) => s.to_string(),
Value::UnQuotedString(s) => s.to_string(),
Value::Number(_, _) | Value::Boolean(_) => value.to_string(),
Value::DoubleQuotedString(_)
| Value::EscapedStringLiteral(_)
| Value::NationalStringLiteral(_)
| Value::SingleQuotedByteStringLiteral(_)
| Value::DoubleQuotedByteStringLiteral(_)
| Value::RawStringLiteral(_)
| Value::HexStringLiteral(_)
| Value::Null
| Value::Placeholder(_) => {
return plan_err!("Unsupported Value in COPY statement {}", value);
}
};
if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.

let renamed_key = format!("format.{}", key);
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else {
options.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options = statement.options;

let file_type = if let Some(file_type) = statement.stored_as {
FileType::from_str(&file_type).map_err(|_| {
Expand Down
8 changes: 0 additions & 8 deletions datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,6 @@ CREATE EXTERNAL TABLE t STORED AS CSV WITH HEADER LOCATION 'abc'
statement error DataFusion error: SQL error: ParserError\("Expected BY, found: LOCATION"\)
CREATE EXTERNAL TABLE t STORED AS CSV PARTITIONED LOCATION 'abc'

# Missing `TYPE` in COMPRESSION clause
statement error DataFusion error: SQL error: ParserError\("Expected TYPE, found: LOCATION"\)
CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION LOCATION 'abc'

# Invalid compression type
statement error DataFusion error: SQL error: ParserError\("Unsupported file compression type ZZZ"\)
CREATE EXTERNAL TABLE t STORED AS CSV COMPRESSION TYPE ZZZ LOCATION 'blahblah'

# Duplicate `STORED AS` clause
statement error DataFusion error: SQL error: ParserError\("STORED AS specified more than once"\)
CREATE EXTERNAL TABLE t STORED AS CSV STORED AS PARQUET LOCATION 'foo.parquet'
Expand Down
1 change: 0 additions & 1 deletion datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ statement ok
CREATE EXTERNAL TABLE avro_table
STORED AS AVRO
LOCATION '../../testing/data/avro/simple_enum.avro'
OPTIONS ('format.has_header' 'true');


# It would be great to see the file read as "4" groups with even sizes (offsets) eventually
Expand Down

0 comments on commit 9d81601

Please sign in to comment.