Skip to content

Commit

Permalink
Support pyarrow back to version 1.0.0 (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Mar 21, 2023
1 parent d102aa0 commit 25fd687
Showing 1 changed file with 68 additions and 25 deletions.
93 changes: 68 additions & 25 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
# improve pandas compatibility, based on geopandas _compat.py
PANDAS_GE_12 = Version(pd.__version__) >= Version("1.2.0")

# When we drop support for pyarrow < 5 all code related to this can be removed.
LEGACY_PYARROW = Version(pa.__version__) < Version("5.0.0")


def _load_parquet_pandas_metadata(
path,
Expand All @@ -42,21 +45,34 @@ def _load_parquet_pandas_metadata(
raise ValueError("Path not found: " + path)

if filesystem.isdir(path):
if LEGACY_PYARROW:
basic_kwargs = dict(filesystem=filesystem, validate_schema=False)
else:
basic_kwargs = dict(filesystem=filesystem, use_legacy_dataset=False)

pqds = pq.ParquetDataset(
path,
filesystem=filesystem,
#validate_schema=False,
use_legacy_dataset=False,
**basic_kwargs,
**engine_kwargs,
)
filename = pathlib.Path(pqds.files[0]).parent.joinpath("_common_metadata")
try:
common_metadata = pq.read_metadata(filename)
except FileNotFoundError:
# Common metadata doesn't exist, so get metadata for first piece instead
filename = pathlib.Path(pqds.files[0])
common_metadata = pq.read_metadata(filename)
metadata = common_metadata.metadata

if LEGACY_PYARROW:
common_metadata = pqds.common_metadata
if common_metadata is None:
# Get metadata for first piece
piece = pqds.pieces[0]
metadata = piece.get_metadata().metadata
else:
metadata = pqds.common_metadata.metadata
else:
filename = pathlib.Path(pqds.files[0]).parent.joinpath("_common_metadata")
try:
common_metadata = pq.read_metadata(filename)
except FileNotFoundError:
# Common metadata doesn't exist, so get metadata for first piece instead
filename = pathlib.Path(pqds.files[0])
common_metadata = pq.read_metadata(filename)
metadata = common_metadata.metadata
else:
with filesystem.open(path) as f:
pf = pq.ParquetFile(f)
Expand Down Expand Up @@ -111,17 +127,28 @@ def read_parquet(
engine_kwargs = engine_kwargs or {}
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

if LEGACY_PYARROW:
basic_kwargs = dict(filesystem=filesystem, validate_schema=False)
else:
basic_kwargs = dict(filesystem=filesystem, use_legacy_dataset=False)

# Load using pyarrow to handle parquet files and directories across filesystems
dataset = pq.ParquetDataset(
path,
filesystem=filesystem,
#validate_schema=False,
use_legacy_dataset=False,
**basic_kwargs,
**engine_kwargs,
**kwargs,
)

metadata = dataset.schema.pandas_metadata
if LEGACY_PYARROW:
metadata = _load_parquet_pandas_metadata(
path,
filesystem=filesystem,
storage_options=storage_options,
engine_kwargs=engine_kwargs,
)
else:
metadata = dataset.schema.pandas_metadata

# If columns specified, prepend index columns to it
if columns is not None:
Expand Down Expand Up @@ -290,12 +317,15 @@ def _perform_read_parquet_dask(
filesystem,
storage_options,
)
if LEGACY_PYARROW:
basic_kwargs = dict(filesystem=filesystem, validate_schema=False)
else:
basic_kwargs = dict(filesystem=filesystem, use_legacy_dataset=False)

datasets = [
pa.parquet.ParquetDataset(
path,
filesystem=filesystem,
#validate_schema=False,
use_legacy_dataset=False,
**basic_kwargs,
**engine_kwargs,
) for path in paths
]
Expand All @@ -304,7 +334,10 @@ def _perform_read_parquet_dask(
pieces = []
for dataset in datasets:
# Perform natural sort on pieces so that "part.10" comes after "part.2"
dataset_pieces = sorted(dataset.fragments, key=lambda piece: natural_sort_key(piece.path))
fragments = getattr(dataset, "fragments", None)
if fragments is None:
fragments = dataset.pieces
dataset_pieces = sorted(fragments, key=lambda piece: natural_sort_key(piece.path))
pieces.extend(dataset_pieces)

delayed_partitions = [
Expand Down Expand Up @@ -356,12 +389,18 @@ def _perform_read_parquet_dask(
else:
cols_no_index = None

if LEGACY_PYARROW:
files = paths
else:
files = getattr(datasets[0], "files", paths)

meta = dd_read_parquet(
datasets[0].files[0],
files[0],
columns=cols_no_index,
filesystem=filesystem,
engine='pyarrow',
categories=categories,
ignore_metadata_file=True,
storage_options=storage_options,
**engine_kwargs,
)._meta
Expand Down Expand Up @@ -441,11 +480,15 @@ def _perform_read_parquet_dask(

def _load_partition_bounds(pqds):
partition_bounds = None
filename = pathlib.Path(pqds.files[0]).parent.joinpath("_common_metadata")
try:
common_metadata = pq.read_metadata(filename)
except FileNotFoundError:
common_metadata = None

if LEGACY_PYARROW:
common_metadata = pqds.common_metadata
else:
filename = pathlib.Path(pqds.files[0]).parent.joinpath("_common_metadata")
try:
common_metadata = pq.read_metadata(filename)
except FileNotFoundError:
common_metadata = None

if common_metadata is not None and b'spatialpandas' in common_metadata.metadata:
spatial_metadata = json.loads(
Expand Down

0 comments on commit 25fd687

Please sign in to comment.