Skip to content

Commit

Permalink
FIX-#4657: Use fsspec for handling s3/http-like paths instead of `s…
Browse files Browse the repository at this point in the history
…3fs` (#4710)

Signed-off-by: Alexey Prutskov <lehaprutskov@gmail.com>
  • Loading branch information
prutskov authored Aug 4, 2022
1 parent 4548012 commit a28e319
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 127 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Key Features and Updates
* FIX-#4686: Propagate metadata and drain call queue in unwrap_partitions (#4697)
* FIX-#4652: Support categorical data in `from_dataframe` (#4737)
* FIX-#4756: Correctly propagate `storage_options` in `read_parquet` (#4764)
* FIX-#4657: Use `fsspec` for handling s3/http-like paths instead of `s3fs` (#4710)
* Performance enhancements
* PERF-#4182: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* PERF-#4288: Improve perf of `groupby.mean` for narrow data (#4591)
Expand Down
1 change: 0 additions & 1 deletion examples/cloud/aws_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands:
- pip install s3fs
- pip install modin
- echo 'export MODIN_RAY_CLUSTER=True' >> ~/.bashrc
# Consider uncommenting these if you also want to run apt-get commands during setup
Expand Down
1 change: 0 additions & 1 deletion examples/spreadsheet/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
s3fs
ray==1.1.0
git+https://github.com/modin-project/modin
modin-spreadsheet
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
fsspec
s3fs
jupyterlab
ipywidgets
modin[dask]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
fsspec
s3fs
jupyterlab
ipywidgets
tqdm
Expand Down
71 changes: 39 additions & 32 deletions modin/core/io/file_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

import fsspec
import os
import re
from modin.config import StorageFormat
from modin.utils import import_optional_dependency
from modin.logging import ClassLogger
import numpy as np

S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)")
NOT_IMPLEMENTED_MESSAGE = "Implement in children classes!"
_SUPPORTED_PROTOCOLS = {"s3", "S3", "http", "https"}


class OpenFile:
Expand Down Expand Up @@ -213,7 +211,9 @@ def get_path(cls, file_path):
if `file_path` is an S3 bucket, parameter will be returned as is, otherwise
absolute path will be returned.
"""
if isinstance(file_path, str) and S3_ADDRESS_REGEX.search(file_path):
if isinstance(file_path, str) and (
fsspec.core.split_protocol(file_path)[0] in _SUPPORTED_PROTOCOLS
):
return file_path
else:
return os.path.abspath(file_path)
Expand Down Expand Up @@ -257,34 +257,41 @@ def file_exists(cls, file_path, storage_options=None):
bool
Whether file exists or not.
"""
if isinstance(file_path, str):
match = S3_ADDRESS_REGEX.search(file_path)
if match is not None:
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])
S3FS = import_optional_dependency(
"s3fs", "Module s3fs is required to read S3FS files."
)
from botocore.exceptions import (
NoCredentialsError,
EndpointConnectionError,
)

if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

s3fs = S3FS.S3FileSystem(anon=False, **new_storage_options)
exists = False
try:
exists = s3fs.exists(file_path) or exists
except (NoCredentialsError, PermissionError, EndpointConnectionError):
pass
s3fs = S3FS.S3FileSystem(anon=True, **new_storage_options)
return exists or s3fs.exists(file_path)
return os.path.exists(file_path)
if (
not isinstance(file_path, str)
or fsspec.core.split_protocol(file_path)[0] not in _SUPPORTED_PROTOCOLS
):
return os.path.exists(file_path)

# `file_path` may start with a capital letter, which isn't supported by `fsspec.core.url_to_fs` used below.
file_path = file_path[0].lower() + file_path[1:]

from botocore.exceptions import (
NoCredentialsError,
EndpointConnectionError,
ConnectTimeoutError,
)

if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

fs, _ = fsspec.core.url_to_fs(file_path, **new_storage_options)
exists = False
try:
exists = fs.exists(file_path)
except (
NoCredentialsError,
PermissionError,
EndpointConnectionError,
ConnectTimeoutError,
):
fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options)
exists = fs.exists(file_path)

return exists

@classmethod
def deploy(cls, func, *args, num_returns=1, **kwargs): # noqa: PR01
Expand Down
121 changes: 69 additions & 52 deletions modin/core/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import sys
from typing import List, Tuple
import warnings
import fsspec

import pandas
import pandas._libs.lib as lib
from pandas.io.common import is_url

from modin.config import NPartitions
from modin.core.io.file_dispatcher import OpenFile
from modin.core.io.file_dispatcher import S3_ADDRESS_REGEX
from modin.core.io.text.csv_dispatcher import CSVDispatcher
from modin.utils import import_optional_dependency

_SUPPORTED_PROTOCOLS = {"s3", "S3"}


class CSVGlobDispatcher(CSVDispatcher):
Expand Down Expand Up @@ -329,34 +331,43 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool:
bool
True if the path is valid.
"""
if isinstance(file_path, str):
match = S3_ADDRESS_REGEX.search(file_path)
if match is not None:
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])
S3FS = import_optional_dependency(
"s3fs", "Module s3fs is required to read S3FS files."
)
from botocore.exceptions import (
NoCredentialsError,
EndpointConnectionError,
)
if isinstance(file_path, str) and is_url(file_path):
raise NotImplementedError("`read_csv_glob` supports only s3-like paths.")

if (
not isinstance(file_path, str)
or fsspec.core.split_protocol(file_path)[0] not in _SUPPORTED_PROTOCOLS
):
return len(glob.glob(file_path)) > 0

# `file_path` may start with a capital letter, which isn't supported by `fsspec.core.url_to_fs` used below.
file_path = file_path[0].lower() + file_path[1:]

from botocore.exceptions import (
NoCredentialsError,
EndpointConnectionError,
ConnectTimeoutError,
)

if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

s3fs = S3FS.S3FileSystem(anon=False, **new_storage_options)
exists = False
try:
exists = len(s3fs.glob(file_path)) > 0 or exists
except (NoCredentialsError, PermissionError, EndpointConnectionError):
pass
s3fs = S3FS.S3FileSystem(anon=True, **new_storage_options)
return exists or len(s3fs.glob(file_path)) > 0
return len(glob.glob(file_path)) > 0
if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

fs, _ = fsspec.core.url_to_fs(file_path, **new_storage_options)
exists = False
try:
exists = fs.exists(file_path)
except (
NoCredentialsError,
PermissionError,
EndpointConnectionError,
ConnectTimeoutError,
):
fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options)
exists = fs.exists(file_path)
return exists or len(fs.glob(file_path)) > 0

@classmethod
def get_path(cls, file_path: str) -> list:
Expand All @@ -373,33 +384,39 @@ def get_path(cls, file_path: str) -> list:
list
List of strings of absolute file paths.
"""
if S3_ADDRESS_REGEX.search(file_path):
# S3FS does not allow captial S in s3 addresses.
if file_path[0] == "S":
file_path = "{}{}".format("s", file_path[1:])

S3FS = import_optional_dependency(
"s3fs", "Module s3fs is required to read S3FS files."
)
from botocore.exceptions import NoCredentialsError, EndpointConnectionError

def get_file_path(fs_handle) -> List[str]:
file_paths = fs_handle.glob(file_path)
s3_addresses = ["{}{}".format("s3://", path) for path in file_paths]
return s3_addresses

s3fs = S3FS.S3FileSystem(anon=False)
try:
return get_file_path(s3fs)
except (NoCredentialsError, EndpointConnectionError):
pass
s3fs = S3FS.S3FileSystem(anon=True)
return get_file_path(s3fs)
else:
if fsspec.core.split_protocol(file_path)[0] not in _SUPPORTED_PROTOCOLS:
relative_paths = glob.glob(file_path)
abs_paths = [os.path.abspath(path) for path in relative_paths]
return abs_paths

# `file_path` may start with a capital letter, which isn't supported by `fsspec.core.url_to_fs` used below.
file_path = file_path[0].lower() + file_path[1:]

from botocore.exceptions import (
NoCredentialsError,
EndpointConnectionError,
ConnectTimeoutError,
)

def get_file_path(fs_handle) -> List[str]:
file_paths = fs_handle.glob(file_path)
if len(file_paths) == 0 and not fs_handle.exists(file_path):
raise FileNotFoundError(f"Path <{file_path}> isn't available.")
s3_addresses = [f"s3://{path}" for path in file_paths]
return s3_addresses

fs, _ = fsspec.core.url_to_fs(file_path)
try:
return get_file_path(fs)
except (
NoCredentialsError,
PermissionError,
EndpointConnectionError,
ConnectTimeoutError,
):
fs, _ = fsspec.core.url_to_fs(file_path, anon=True)
return get_file_path(fs)

@classmethod
def partitioned_file(
cls,
Expand Down
73 changes: 43 additions & 30 deletions modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
get_unique_filename,
teardown_test_files,
test_data,
eval_general,
)
from modin.test.test_utils import warns_that_defaulting_to_pandas
from modin.pandas.test.utils import parse_dates_values_by_id, time_parsing_csv_path
Expand Down Expand Up @@ -125,7 +126,10 @@ def test_read_csv_empty_frame(self):
def test_read_csv_without_glob(self):
with pytest.warns(UserWarning, match=r"Shell-style wildcard"):
with pytest.raises(FileNotFoundError):
pd.read_csv_glob("s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-")
pd.read_csv_glob(
"s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-",
storage_options={"anon": True},
)

def test_read_csv_glob_4373(self):
columns, filename = ["col0"], "1x1.csv"
Expand Down Expand Up @@ -164,24 +168,25 @@ def test_read_single_csv_with_parse_dates(self, parse_dates):
Engine.get() != "Ray", reason="Currently only support Ray engine for glob paths."
)
def test_read_multiple_csv_s3():
modin_df = pd.read_csv_glob("S3://noaa-ghcn-pds/csv/178*.csv")

# We have to specify the columns because the column names are not identical. Since we specified the column names, we also have to skip the original column names.
pandas_dfs = [
pandas.read_csv(
"s3://noaa-ghcn-pds/csv/178{}.csv".format(i),
names=modin_df.columns,
skiprows=[0],
)
for i in range(10)
]
pandas_df = pd.concat(pandas_dfs)

# Indexes get messed up when concatting so we reset both.
pandas_df = pandas_df.reset_index(drop=True)
modin_df = modin_df.reset_index(drop=True)
path = "S3://modin-datasets/testing/multiple_csv/test_data*.csv"

df_equals(modin_df, pandas_df)
def _pandas_read_csv_glob(path, storage_options):
pandas_dfs = [
pandas.read_csv(
f"{path.lower().split('*')[0]}{i}.csv", storage_options=storage_options
)
for i in range(2)
]
return pandas.concat(pandas_dfs).reset_index(drop=True)

eval_general(
pd,
pandas,
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs).reset_index(drop=True)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options={"anon": True},
)


test_default_to_pickle_filename = "test_default_to_pickle.pkl"
Expand All @@ -197,19 +202,27 @@ def test_read_multiple_csv_s3():
)
def test_read_multiple_csv_s3_storage_opts(storage_options):
path = "s3://modin-datasets/testing/multiple_csv/"
# Test the fact of handling of `storage_options`
modin_df = pd.read_csv_glob(path, storage_options=storage_options)
pandas_df = pd.concat(
[
pandas.read_csv(
f"{path}test_data{i}.csv",
storage_options=storage_options,
)
for i in range(2)
],
).reset_index(drop=True)

df_equals(modin_df, pandas_df)
def _pandas_read_csv_glob(path, storage_options):
pandas_df = pandas.concat(
[
pandas.read_csv(
f"{path}test_data{i}.csv",
storage_options=storage_options,
)
for i in range(2)
],
).reset_index(drop=True)
return pandas_df

eval_general(
pd,
pandas,
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options=storage_options,
)


@pytest.mark.skipif(
Expand Down
Loading

0 comments on commit a28e319

Please sign in to comment.