Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REF/BUG/TYP: read_csv shouldn't close user-provided file handles #36997

Merged
merged 10 commits into from
Nov 4, 2020
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ I/O
- Bug in output rendering of complex numbers showing too many trailing zeros (:issue:`36799`)
- Bug in :class:`HDFStore` threw a ``TypeError`` when exporting an empty :class:`DataFrame` with ``datetime64[ns, tz]`` dtypes with a fixed HDF5 store (:issue:`20594`)
- Bug in :class:`HDFStore` was dropping timezone information when exporting :class:`Series` with ``datetime64[ns, tz]`` dtypes with a fixed HDF5 store (:issue:`20594`)
- :func:`read_csv` was closing user-provided binary file handles when ``engine="c"`` and an ``encoding`` was requested (:issue:`36980`)
- Bug in :meth:`DataFrame.to_hdf` was not dropping missing rows with ``dropna=True`` (:issue:`35719`)

Plotting
Expand Down
122 changes: 14 additions & 108 deletions pandas/_libs/parsers.pyx
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
# Copyright (c) 2012, Lambda Foundry, Inc.
# See LICENSE for the license
import bz2
from csv import QUOTE_MINIMAL, QUOTE_NONE, QUOTE_NONNUMERIC
from errno import ENOENT
import gzip
import io
import os
import sys
import time
import warnings
import zipfile

from libc.stdlib cimport free
from libc.string cimport strcasecmp, strlen, strncpy

import cython
from cython import Py_ssize_t

from cpython.bytes cimport PyBytes_AsString, PyBytes_FromString
from cpython.bytes cimport PyBytes_AsString
from cpython.exc cimport PyErr_Fetch, PyErr_Occurred
from cpython.object cimport PyObject
from cpython.ref cimport Py_XDECREF
Expand Down Expand Up @@ -67,7 +62,6 @@ from pandas._libs.khash cimport (
khiter_t,
)

from pandas.compat import get_lzma_file, import_lzma
from pandas.errors import DtypeWarning, EmptyDataError, ParserError, ParserWarning

from pandas.core.dtypes.common import (
Expand All @@ -82,11 +76,10 @@ from pandas.core.dtypes.common import (
)
from pandas.core.dtypes.concat import union_categoricals

lzma = import_lzma()

cdef:
float64_t INF = <float64_t>np.inf
float64_t NEGINF = -INF
int64_t DEFAULT_CHUNKSIZE = 256 * 1024


cdef extern from "headers/portable.h":
Expand Down Expand Up @@ -275,14 +268,15 @@ cdef extern from "parser/io.h":
size_t *bytes_read, int *status)


DEFAULT_CHUNKSIZE = 256 * 1024


cdef class TextReader:
"""

# source: StringIO or file object

..versionchange:: 1.2.0
removed 'compression', 'memory_map', and 'encoding' argument.
These arguments are outsourced to CParserWrapper.
'source' has to be a file handle.
"""

cdef:
Expand All @@ -299,16 +293,14 @@ cdef class TextReader:

cdef public:
int64_t leading_cols, table_width, skipfooter, buffer_lines
bint allow_leading_cols, mangle_dupe_cols, memory_map, low_memory
bint allow_leading_cols, mangle_dupe_cols, low_memory
bint delim_whitespace
object delimiter, converters
object na_values
object header, orig_header, names, header_start, header_end
object index_col
object skiprows
object dtype
object encoding
object compression
object usecols
list dtype_cast_order
set unnamed_cols
Expand All @@ -321,18 +313,15 @@ cdef class TextReader:
header_end=0,
index_col=None,
names=None,
bint memory_map=False,
tokenize_chunksize=DEFAULT_CHUNKSIZE,
bint delim_whitespace=False,
compression=None,
converters=None,
bint skipinitialspace=False,
escapechar=None,
bint doublequote=True,
quotechar=b'"',
quoting=0,
lineterminator=None,
encoding=None,
comment=None,
decimal=b'.',
thousands=None,
Expand All @@ -356,15 +345,7 @@ cdef class TextReader:
bint skip_blank_lines=True):

# set encoding for native Python and C library
if encoding is not None:
if not isinstance(encoding, bytes):
encoding = encoding.encode('utf-8')
encoding = encoding.lower()
self.c_encoding = <char*>encoding
else:
self.c_encoding = NULL

self.encoding = encoding
self.c_encoding = NULL

self.parser = parser_new()
self.parser.chunksize = tokenize_chunksize
Expand All @@ -374,9 +355,6 @@ cdef class TextReader:
# For timekeeping
self.clocks = []

self.compression = compression
self.memory_map = memory_map

self.parser.usecols = (usecols is not None)

self._setup_parser_source(source)
Expand Down Expand Up @@ -562,11 +540,6 @@ cdef class TextReader:
parser_del(self.parser)

def close(self):
# we need to properly close an open derived
# filehandle here, e.g. and UTFRecoder
if self.handle is not None:
self.handle.close()

# also preemptively free all allocated memory
parser_free(self.parser)
if self.true_set:
Expand Down Expand Up @@ -614,82 +587,15 @@ cdef class TextReader:
cdef:
void *ptr

self.parser.cb_io = NULL
self.parser.cb_cleanup = NULL

if self.compression:
if self.compression == 'gzip':
if isinstance(source, str):
source = gzip.GzipFile(source, 'rb')
else:
source = gzip.GzipFile(fileobj=source)
elif self.compression == 'bz2':
source = bz2.BZ2File(source, 'rb')
elif self.compression == 'zip':
zip_file = zipfile.ZipFile(source)
zip_names = zip_file.namelist()

if len(zip_names) == 1:
file_name = zip_names.pop()
source = zip_file.open(file_name)

elif len(zip_names) == 0:
raise ValueError(f'Zero files found in compressed '
f'zip file {source}')
else:
raise ValueError(f'Multiple files found in compressed '
f'zip file {zip_names}')
elif self.compression == 'xz':
if isinstance(source, str):
source = get_lzma_file(lzma)(source, 'rb')
else:
source = get_lzma_file(lzma)(filename=source)
else:
raise ValueError(f'Unrecognized compression type: '
f'{self.compression}')

if (self.encoding and hasattr(source, "read") and
not hasattr(source, "encoding")):
source = io.TextIOWrapper(
source, self.encoding.decode('utf-8'), newline='')

self.encoding = b'utf-8'
self.c_encoding = <char*>self.encoding

self.handle = source

if isinstance(source, str):
encoding = sys.getfilesystemencoding() or "utf-8"
usource = source
source = source.encode(encoding)

if self.memory_map:
ptr = new_mmap(source)
if ptr == NULL:
# fall back
ptr = new_file_source(source, self.parser.chunksize)
self.parser.cb_io = &buffer_file_bytes
self.parser.cb_cleanup = &del_file_source
else:
self.parser.cb_io = &buffer_mmap_bytes
self.parser.cb_cleanup = &del_mmap
else:
twoertwein marked this conversation as resolved.
Show resolved Hide resolved
ptr = new_file_source(source, self.parser.chunksize)
self.parser.cb_io = &buffer_file_bytes
self.parser.cb_cleanup = &del_file_source
self.parser.source = ptr

elif hasattr(source, 'read'):
# e.g., StringIO

ptr = new_rd_source(source)
self.parser.source = ptr
self.parser.cb_io = &buffer_rd_bytes
self.parser.cb_cleanup = &del_rd_source
else:
if not hasattr(source, "read"):
raise IOError(f'Expected file path name or file-like object, '
f'got {type(source)} type')

ptr = new_rd_source(source)
self.parser.source = ptr
self.parser.cb_io = &buffer_rd_bytes
self.parser.cb_cleanup = &del_rd_source

cdef _get_header(self):
# header is now a list of lists, so field_count should use header[0]

Expand Down
29 changes: 6 additions & 23 deletions pandas/_typing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, tzinfo
from io import IOBase
from io import BufferedIOBase, RawIOBase, TextIOBase, TextIOWrapper
twoertwein marked this conversation as resolved.
Show resolved Hide resolved
from mmap import mmap
from pathlib import Path
from typing import (
IO,
Expand All @@ -10,7 +10,6 @@
Callable,
Collection,
Dict,
Generic,
Hashable,
List,
Mapping,
Expand Down Expand Up @@ -77,8 +76,6 @@
"ExtensionDtype", str, np.dtype, Type[Union[str, float, int, complex, bool, object]]
]
DtypeObj = Union[np.dtype, "ExtensionDtype"]
FilePathOrBuffer = Union[str, Path, IO[AnyStr], IOBase]
FileOrBuffer = Union[str, IO[AnyStr], IOBase]

# FrameOrSeriesUnion means either a DataFrame or a Series. E.g.
# `def func(a: FrameOrSeriesUnion) -> FrameOrSeriesUnion: ...` means that if a Series
Expand Down Expand Up @@ -133,6 +130,10 @@
"Resampler",
]

# filenames and file-like-objects
Buffer = Union[IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper, mmap]
FileOrBuffer = Union[str, Buffer[T]]
FilePathOrBuffer = Union[Path, FileOrBuffer[T]]

# for arbitrary kwargs passed during reading/writing files
StorageOptions = Optional[Dict[str, Any]]
Expand All @@ -150,21 +151,3 @@

# type of float formatter in DataFrameFormatter
FloatFormatType = Union[str, Callable, "EngFormatter"]


@dataclass
class IOargs(Generic[ModeVar, EncodingVar]):
"""
Return value of io/common.py:get_filepath_or_buffer.

Note (copy&past from io/parsers):
filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile]
though mypy handling of conditional imports is difficult.
See https://github.com/python/mypy/issues/1297
"""

filepath_or_buffer: FileOrBuffer
encoding: EncodingVar
compression: CompressionDict
should_close: bool
mode: Union[ModeVar, str]
6 changes: 3 additions & 3 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datetime
from io import StringIO
import itertools
import mmap
from textwrap import dedent
from typing import (
IO,
Expand Down Expand Up @@ -2286,10 +2287,9 @@ def to_markdown(
if buf is None:
return result
ioargs = get_filepath_or_buffer(buf, mode=mode, storage_options=storage_options)
assert not isinstance(ioargs.filepath_or_buffer, str)
assert not isinstance(ioargs.filepath_or_buffer, (str, mmap.mmap))
ioargs.filepath_or_buffer.writelines(result)
if ioargs.should_close:
ioargs.filepath_or_buffer.close()
ioargs.close()
return None

@deprecate_kwarg(old_arg_name="fname", new_arg_name="path")
Expand Down
Loading