Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 endpoint configuration #1169 #1172

Merged
merged 12 commits into from
Nov 22, 2020
5 changes: 4 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,9 @@ def ingest(
try:
if issubclass(type(feature_table.batch_source), FileSource):
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
_upload_to_file_source(file_url, with_partitions, dest_path)
_upload_to_file_source(
file_url, with_partitions, dest_path, self._config
)
if issubclass(type(feature_table.batch_source), BigQuerySource):
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
feature_table_timestamp_column = (
Expand Down Expand Up @@ -1004,6 +1006,7 @@ def get_historical_features(
entity_source = stage_entities_to_fs(
entity_source,
staging_location=self._config.get(opt.SPARK_STAGING_LOCATION),
config=self._config,
)

if self._use_job_service:
Expand Down
15 changes: 12 additions & 3 deletions sdk/python/feast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from feast.constants import ConfigOptions as opt

_logger = logging.getLogger(__name__)
_UNSET = object()


def _init_config(path: str):
Expand All @@ -50,7 +51,7 @@ def _init_config(path: str):
os.makedirs(os.path.dirname(config_dir), exist_ok=True)

# Create the configuration file itself
config = ConfigParser(defaults=opt().defaults())
config = ConfigParser(allow_no_value=True)
if os.path.exists(path):
config.read(path)

Expand Down Expand Up @@ -113,24 +114,32 @@ def __init__(
self._options = {}
if options and isinstance(options, dict):
self._options = options
self._defaults = opt().defaults()

self._config = config # type: ConfigParser
self._path = path # type: str

def get(self, option):
def get(self, option, default=_UNSET):
"""
Returns a single configuration option as a string

Args:
option: Name of the option
default: Default value to return if option is not found

Returns: String option that is returned

"""
default = {option: default} if default is not _UNSET else {}
return self._config.get(
CONFIG_FILE_SECTION,
option,
vars={**_get_feast_env_vars(), **self._options},
vars={
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I think you should submit this change as part of a separate PR please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created new PR, it seems it needs to be merged before this one

**default,
**self._defaults,
**_get_feast_env_vars(),
**self._options,
},
)

def getboolean(self, option):
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Time to wait for historical feature requests before timing out.
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600"

#: Endpoint URL for S3 storage_client
S3_ENDPOINT_URL: Optional[str] = None

#: Authentication Provider - Google OpenID/OAuth
#:
#: Options: "google" / "oauth"
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pyarrow as pa
from pyarrow import parquet as pq

from feast.config import Config
from feast.staging.storage_client import get_staging_client


Expand Down Expand Up @@ -166,7 +167,7 @@ def _read_table_from_source(


def _upload_to_file_source(
file_url: str, with_partitions: bool, dest_path: str
file_url: str, with_partitions: bool, dest_path: str, config: Config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the arguments in the docstring is out of date. Would you mind updating?

) -> None:
"""
Uploads data into a FileSource. Currently supports GCS, S3 and Local FS.
Expand All @@ -177,7 +178,7 @@ def _upload_to_file_source(
from urllib.parse import urlparse

uri = urlparse(file_url)
staging_client = get_staging_client(uri.scheme)
staging_client = get_staging_client(uri.scheme, config)

if with_partitions:
for path in glob.glob(os.path.join(dest_path, "**/*")):
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/staging/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pandas as pd

from feast.config import Config
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource
from feast.staging.storage_client import get_staging_client
Expand All @@ -18,15 +19,15 @@


def stage_entities_to_fs(
entity_source: pd.DataFrame, staging_location: str
entity_source: pd.DataFrame, staging_location: str, config: Config
) -> FileSource:
"""
Dumps given (entities) dataframe as parquet file and stage it to remote file storage (subdirectory of staging_location)

:return: FileSource with remote destination path
"""
entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4())))
staging_client = get_staging_client(entity_staging_uri.scheme)
staging_client = get_staging_client(entity_staging_uri.scheme, config)
with tempfile.NamedTemporaryFile() as df_export_path:
entity_source.to_parquet(df_export_path.name)
bucket = (
Expand Down
30 changes: 25 additions & 5 deletions sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

from google.auth.exceptions import DefaultCredentialsError

from feast.config import Config
from feast.constants import ConfigOptions as opt

GS = "gs"
S3 = "s3"
LOCAL_FILE = "file"
Expand Down Expand Up @@ -144,15 +147,15 @@ class S3Client(AbstractStagingClient):
Implementation of AbstractStagingClient for Aws S3 storage
"""

def __init__(self):
def __init__(self, endpoint_url: str = None):
try:
import boto3
except ImportError:
raise ImportError(
"Install package boto3 for s3 staging support"
"run ```pip install boto3```"
)
self.s3_client = boto3.client("s3")
self.s3_client = boto3.client("s3", endpoint_url=endpoint_url)

def download_file(self, uri: ParseResult) -> IO[bytes]:
"""
Expand Down Expand Up @@ -275,21 +278,38 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str):
shutil.copy(local_path, dest_fpath)


storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient}
def _s3_client(config: Config = None):
if config is None:
endpoint_url = None
else:
endpoint_url = config.get(opt.S3_ENDPOINT_URL, None)
return S3Client(endpoint_url=endpoint_url)


def _gcs_client(config: Config = None):
return GCSClient()


def _local_fs_client(config: Config = None):
return LocalFSClient()


storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client}


def get_staging_client(scheme):
def get_staging_client(scheme, config: Config = None):
"""
Initialization of a specific client object(GCSClient, S3Client etc.)

Args:
scheme (str): uri scheme: s3, gs or file
config (Config): additional configuration

Returns:
An object of concrete implementation of AbstractStagingClient
"""
try:
return storage_clients[scheme]()
return storage_clients[scheme](config)
except ValueError:
raise Exception(
f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported"
Expand Down