Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#7321: Using 'C' engine instead of 'pyarrow' for getting metadata in read_csv #7322

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions modin/core/io/text/text_file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def partitioned_file(
newline: bytes = None,
header_size: int = 0,
pre_reading: int = 0,
read_callback_kw: dict = None,
get_metadata_kw: dict = None,
):
"""
Compute chunk sizes in bytes for every partition.
Expand Down Expand Up @@ -244,7 +244,7 @@ def partitioned_file(
Number of rows, that occupied by header.
pre_reading : int, default: 0
Number of rows between header and skipped rows, that should be read.
read_callback_kw : dict, optional
get_metadata_kw : dict, optional
Keyword arguments for `cls.read_callback` to compute metadata if needed.
This option is not compatible with `pre_reading!=0`.

Expand All @@ -255,11 +255,11 @@ def partitioned_file(
int : partition start read byte
int : partition end read byte
pandas.DataFrame or None
Dataframe from which metadata can be retrieved. Can be None if `read_callback_kw=None`.
Dataframe from which metadata can be retrieved. Can be None if `get_metadata_kw=None`.
"""
if read_callback_kw is not None and pre_reading != 0:
if get_metadata_kw is not None and pre_reading != 0:
raise ValueError(
f"Incompatible combination of parameters: {read_callback_kw=}, {pre_reading=}"
f"Incompatible combination of parameters: {get_metadata_kw=}, {pre_reading=}"
)
read_rows_counter = 0
outside_quotes = True
Expand Down Expand Up @@ -297,11 +297,11 @@ def partitioned_file(
rows_skipper(skiprows)
else:
rows_skipper(skiprows)
if read_callback_kw:
if get_metadata_kw:
start = f.tell()
# For correct behavior, if we want to avoid double skipping rows,
# we need to get metadata after skipping.
pd_df_metadata = cls.read_callback(f, **read_callback_kw)
pd_df_metadata = cls.read_callback(f, **get_metadata_kw)
f.seek(start)
rows_skipper(header_size)

Expand Down Expand Up @@ -1063,28 +1063,32 @@ def _read(cls, filepath_or_buffer, **kwargs):
and (usecols is None or skiprows is None)
and pre_reading == 0
)
read_callback_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col)
get_metadata_kw = dict(kwargs, nrows=1, skipfooter=0, index_col=index_col)
if get_metadata_kw.get("engine", None) == "pyarrow":
# pyarrow engine doesn't support `nrows` option;
# https://github.com/pandas-dev/pandas/issues/38872 can be used to track pyarrow engine features
get_metadata_kw["engine"] = "c"
if not can_compute_metadata_while_skipping_rows:
pd_df_metadata = cls.read_callback(
filepath_or_buffer_md,
**read_callback_kw,
**get_metadata_kw,
)
column_names = pd_df_metadata.columns
column_widths, num_splits = cls._define_metadata(
pd_df_metadata, column_names
)
read_callback_kw = None
get_metadata_kw = None
else:
read_callback_kw = dict(read_callback_kw, skiprows=None)
get_metadata_kw = dict(get_metadata_kw, skiprows=None)
# `memory_map` doesn't work with file-like object so we can't use it here.
# We can definitely skip it without violating the reading logic
# since this parameter is intended to optimize reading.
# For reading a couple of lines, this is not essential.
read_callback_kw.pop("memory_map", None)
get_metadata_kw.pop("memory_map", None)
# These parameters are already used when opening file `f`,
# they do not need to be used again.
read_callback_kw.pop("storage_options", None)
read_callback_kw.pop("compression", None)
get_metadata_kw.pop("storage_options", None)
get_metadata_kw.pop("compression", None)

with OpenFile(
filepath_or_buffer_md,
Expand All @@ -1110,7 +1114,7 @@ def _read(cls, filepath_or_buffer, **kwargs):
newline=newline,
header_size=header_size,
pre_reading=pre_reading,
read_callback_kw=read_callback_kw,
get_metadata_kw=get_metadata_kw,
)
if can_compute_metadata_while_skipping_rows:
pd_df_metadata = pd_df_metadata_temp
Expand Down
2 changes: 1 addition & 1 deletion modin/tests/pandas/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def test_read_csv_encoding_976(self, pathlike):
# Quoting, Compression parameters tests
@pytest.mark.parametrize("compression", ["infer", "gzip", "bz2", "xz", "zip"])
@pytest.mark.parametrize("encoding", [None, "latin8", "utf16"])
@pytest.mark.parametrize("engine", [None, "python", "c"])
@pytest.mark.parametrize("engine", [None, "python", "c", "pyarrow"])
def test_read_csv_compression(self, make_csv_file, compression, encoding, engine):
unique_filename = make_csv_file(encoding=encoding, compression=compression)
expected_exception = None
Expand Down
Loading