diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 52a00cf974abd..22746704aab2c 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -126,10 +126,11 @@ impl CsvParser { } writer.insert(|desc| { if let Some(i) = headers.iter().position(|name| name == &desc.name) { - Self::parse_string( - &desc.data_type, - fields.get_mut(i).map(std::mem::take).unwrap_or_default(), - ) + let value = fields.get_mut(i).map(std::mem::take).unwrap_or_default(); + if value.is_empty() { + return Ok(None); + } + Self::parse_string(&desc.data_type, value) } else { Ok(None) } @@ -138,6 +139,9 @@ impl CsvParser { fields.reverse(); writer.insert(|desc| { if let Some(value) = fields.pop() { + if value.is_empty() { + return Ok(None); + } Self::parse_string(&desc.data_type, value) } else { Ok(None) @@ -162,6 +166,7 @@ mod tests { r#""15541","a,1,1,",4"#, r#"0,"""0",0"#, r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#, + r#",,,,"#, ]; let descs = vec![ SourceColumnDesc::simple("a", DataType::Int32, 0.into()), @@ -252,6 +257,14 @@ mod tests { (Some(ScalarImpl::Int32(0))) ); } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!(row.datum_at(0), None); + assert_eq!(row.datum_at(1), None); + assert_eq!(row.datum_at(2), None); + } } #[tokio::test] async fn test_csv_with_headers() {