Skip to content

Commit

Permalink
fix(rust, python): respect 'ignore_errors=False' in csv parser (#10641)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 21, 2023
1 parent 15527ae commit 6dd3432
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 36 deletions.
6 changes: 2 additions & 4 deletions crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::chunked_array::Settings;
use crate::prelude::unique::rank::rank;
#[cfg(feature = "zip_with")]
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::{_split_offsets, split_ca, split_series, Wrap};
use crate::utils::{_split_offsets, get_casting_failures, split_ca, split_series, Wrap};
use crate::POOL;

/// # Series
Expand Down Expand Up @@ -790,14 +790,12 @@ impl Series {
}
let s = self.0.cast(dtype)?;
if null_count != s.null_count() {
let failure_mask = !self.is_null() & s.is_null();
let failures = self.filter_threaded(&failure_mask, false)?.unique()?;
let failures = get_casting_failures(self, &s)?;
polars_bail!(
ComputeError:
"strict conversion from `{}` to `{}` failed for column: {}, value(s) {}; \
if you were trying to cast Utf8 to temporal dtypes, consider using `strptime`",
self.dtype(), dtype, s.name(), failures.fmt_list(),

);
} else {
Ok(s)
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/utils/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ pub fn ensure_sorted_arg(s: &Series, operation: &str) -> PolarsResult<()> {
", operation);
Ok(())
}

pub fn get_casting_failures(input: &Series, output: &Series) -> PolarsResult<Series> {
let failure_mask = !input.is_null() & output.is_null();
input.filter_threaded(&failure_mask, false)?.unique()
}
36 changes: 24 additions & 12 deletions crates/polars-io/src/csv/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,23 +422,31 @@ where
// Safety:
// we just checked it is ascii
unsafe { std::str::from_utf8_unchecked(bytes) }
} else if ignore_errors {
buf.builder.append_null();
return Ok(());
} else if !ignore_errors && std::str::from_utf8(bytes).is_err() {
polars_bail!(ComputeError: "invalid utf-8 sequence");
} else {
buf.builder.append_null();
return Ok(());
match std::str::from_utf8(bytes) {
Ok(val) => val,
Err(_) => {
if ignore_errors {
buf.builder.append_null();
return Ok(());
} else {
polars_bail!(ComputeError: "invalid utf-8 sequence");
}
},
}
};

let pattern = match &buf.compiled {
Some(compiled) => compiled.pattern,
None => match infer_pattern_single(val) {
Some(pattern) => pattern,
None => {
buf.builder.append_null();
return Ok(());
if ignore_errors {
buf.builder.append_null();
return Ok(());
} else {
polars_bail!(ComputeError: "could not find a 'date/datetime' pattern for {}", val)
}
},
},
};
Expand All @@ -449,9 +457,13 @@ where
buf.builder.append_option(parsed);
Ok(())
},
Err(_) => {
buf.builder.append_null();
Ok(())
Err(err) => {
if ignore_errors {
buf.builder.append_null();
Ok(())
} else {
Err(err)
}
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl<'a> BatchedCsvReaderMmap<'a> {
self.starting_point_offset,
)?;

cast_columns(&mut df, &self.to_cast, false)?;
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;

update_string_stats(&self.str_capacities, &self.str_columns, &df)?;
if let Some(rc) = &self.row_count {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.starting_point_offset,
)?;

cast_columns(&mut df, &self.to_cast, false)?;
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;

update_string_stats(&self.str_capacities, &self.str_columns, &df)?;
if let Some(rc) = &self.row_count {
Expand Down
48 changes: 30 additions & 18 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use batched_read::*;
use polars_arrow::array::*;
use polars_core::config::verbose;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::utils::{accumulate_dataframes_vertical, get_casting_failures};
use polars_core::POOL;
#[cfg(feature = "polars-time")]
use polars_time::prelude::*;
Expand All @@ -32,21 +32,33 @@ pub(crate) fn cast_columns(
df: &mut DataFrame,
to_cast: &[Field],
parallel: bool,
ignore_errors: bool,
) -> PolarsResult<()> {
let cast_fn = |s: &Series, fld: &Field| match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(DataType::Utf8, DataType::Date) => s
.utf8()
.unwrap()
.as_date(None, false)
.map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(DataType::Utf8, DataType::Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu, false, false, None, None)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
let cast_fn = |s: &Series, fld: &Field| {
let out = match (s.dtype(), fld.data_type()) {
#[cfg(feature = "temporal")]
(DataType::Utf8, DataType::Date) => s
.utf8()
.unwrap()
.as_date(None, false)
.map(|ca| ca.into_series()),
#[cfg(feature = "temporal")]
(DataType::Utf8, DataType::Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu, false, false, None, None)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
}?;
if !ignore_errors && s.null_count() != out.null_count() {
let failures = get_casting_failures(s, &out)?;
polars_bail!(
ComputeError:
"parsing to `{}` failed for column: {}, value(s) {};",
fld.data_type(), s.name(), failures.fmt_list(),
)
}
Ok(out)
};

if parallel {
Expand Down Expand Up @@ -618,7 +630,7 @@ impl<'a> CoreReader<'a> {
local_df.with_row_count_mut(&rc.name, Some(rc.offset));
};

cast_columns(&mut local_df, &self.to_cast, false)?;
cast_columns(&mut local_df, &self.to_cast, false, self.ignore_errors)?;
let s = predicate.evaluate(&local_df)?;
let mask = s.bool()?;
local_df = local_df.filter(mask)?;
Expand Down Expand Up @@ -681,7 +693,7 @@ impl<'a> CoreReader<'a> {
update_string_stats(&str_capacities, &str_columns, &df)?;
}

cast_columns(&mut df, &self.to_cast, false)?;
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
Expand Down Expand Up @@ -731,7 +743,7 @@ impl<'a> CoreReader<'a> {
)
};

cast_columns(&mut df, &self.to_cast, false)?;
cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars/tests/it/io/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ AUDCAD,1616455921,0.96212,0.95666,1
"b",
DataType::Datetime(TimeUnit::Nanoseconds, None),
)]))))
.with_ignore_errors(true)
.finish()?;

assert_eq!(
Expand Down
32 changes: 32 additions & 0 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,3 +1440,35 @@ def test_csv_quote_styles() -> None:
df.write_csv(quote_style="non_numeric", quote="8")
== '8float8,8string8,8int8,8bool8\n1.0,8a8,1,8true8\n2.0,8abc8,2,8false8\n,8"hello8,3,\n'
)


def test_ignore_errors_casting_dtypes() -> None:
csv = """inventory
10
400
90
"""

assert pl.read_csv(
source=io.StringIO(csv),
dtypes={"inventory": pl.Int8},
ignore_errors=True,
).to_dict(False) == {"inventory": [10, None, None, 90]}

with pytest.raises(pl.ComputeError):
pl.read_csv(
source=io.StringIO(csv),
dtypes={"inventory": pl.Int8},
ignore_errors=False,
)


def test_ignore_errors_date_parser() -> None:
data_invalid_date = "int,float,date\n3,3.4,X"
with pytest.raises(pl.ComputeError):
pl.read_csv(
source=io.StringIO(data_invalid_date),
dtypes={"date": pl.Date},
ignore_errors=False,
)

0 comments on commit 6dd3432

Please sign in to comment.