Skip to content

Commit

Permalink
Increase Jaeger Tracing coverage (#719)
Browse files Browse the repository at this point in the history
* Add jaegertracing logs to getOnlineFeatures

* Apply spotless

staging client stubs

strategy pattern

Moving logic and adding s3 support

test cases and some refactoring

returning file object

updated tests

fixed lint error and added moto to requirements-ci
  • Loading branch information
terryyylim authored and jmelinav committed Jun 4, 2020
1 parent f9ce8ee commit f1cc709
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 112 deletions.
17 changes: 3 additions & 14 deletions sdk/python/feast/job.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import tempfile
import time
from datetime import datetime, timedelta
from typing import List
from urllib.parse import urlparse

import fastavro
import pandas as pd
from google.cloud import storage
from google.protobuf.json_format import MessageToJson

from feast.core.CoreService_pb2 import ListIngestionJobsRequest
Expand All @@ -23,6 +21,7 @@
from feast.serving.ServingService_pb2 import Job as JobProto
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.source import Source
from feast.staging.staging_strategy import StagingStrategy

# Maximum no of seconds to wait until the retrieval jobs status is DONE in Feast
# Currently set to the maximum query execution time limit in BigQuery
Expand All @@ -47,8 +46,7 @@ def __init__(
"""
self.job_proto = job_proto
self.serving_stub = serving_stub
# TODO: abstract away GCP depedency
self.gcs_client = storage.Client(project=None)
self.staging_strategy = StagingStrategy()

@property
def id(self):
Expand Down Expand Up @@ -126,16 +124,7 @@ def result(self, timeout_sec: int = DEFAULT_TIMEOUT_SEC):
"""
uris = self.get_avro_files(timeout_sec)
for file_uri in uris:
if file_uri.scheme == "gs":
file_obj = tempfile.TemporaryFile()
self.gcs_client.download_blob_to_file(file_uri.geturl(), file_obj)
elif file_uri.scheme == "file":
file_obj = open(file_uri.path, "rb")
else:
raise Exception(
f"Could not identify file URI {file_uri}. Only gs:// and file:// supported"
)

file_obj = self.staging_strategy.execute_file_download(file_uri)
file_obj.seek(0)
avro_reader = fastavro.reader(file_obj)

Expand Down
116 changes: 20 additions & 96 deletions sdk/python/feast/loaders/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
# limitations under the License.

import os
import re
import shutil
import tempfile
import uuid
from datetime import datetime
from typing import List, Optional, Tuple, Union
from urllib.parse import ParseResult, urlparse
from urllib.parse import urlparse

import pandas as pd
from google.cloud import storage
from pandavro import to_avro

from feast.staging.staging_strategy import StagingStrategy


def export_source_to_staging_location(
source: Union[pd.DataFrame, str], staging_location_uri: str
Expand Down Expand Up @@ -58,6 +58,7 @@ def export_source_to_staging_location(
remote staging location.
"""

staging_strategy = StagingStrategy()
uri = urlparse(staging_location_uri)

# Prepare Avro file to be exported to staging location
Expand All @@ -66,47 +67,34 @@ def export_source_to_staging_location(
uri_path = None # type: Optional[str]
if uri.scheme == "file":
uri_path = uri.path

# Remote gs staging location provided by serving
dir_path, file_name, source_path = export_dataframe_to_local(
df=source, dir_path=uri_path
)
elif urlparse(source).scheme in ["", "file"]:
# Local file provided as a source
dir_path = None
file_name = os.path.basename(source)
source_path = os.path.abspath(
os.path.join(urlparse(source).netloc, urlparse(source).path)
)
elif urlparse(source).scheme == "gs":
# Google Cloud Storage path provided
input_source_uri = urlparse(source)
if "*" in source:
# Wildcard path
return _get_files(bucket=input_source_uri.hostname, uri=input_source_uri)
elif isinstance(source, str):
if urlparse(source).scheme in ["", "file"]:
# Local file provided as a source
dir_path = None
file_name = os.path.basename(source)
source_path = os.path.abspath(
os.path.join(urlparse(source).netloc, urlparse(source).path)
)
else:
return [source]
# gs, s3 file provided as a source.
return staging_strategy.execute_get_source_files(source)
else:
raise Exception(
f"Only string and DataFrame types are allowed as a "
f"source, {type(source)} was provided."
)

# Push data to required staging location
if uri.scheme == "gs":
# Staging location is a Google Cloud Storage path
upload_file_to_gcs(
source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name
)
elif uri.scheme == "file":
# Staging location is a file path
# Used for end-to-end test
pass
else:
raise Exception(
f"Staging location {staging_location_uri} does not have a "
f"valid URI. Only gs:// and file:// uri scheme are supported."
)
staging_strategy.execute_file_upload(
uri.scheme,
source_path,
uri.hostname,
str(uri.path).strip("/") + "/" + file_name,
)

# Clean up, remove local staging file
if dir_path and isinstance(source, pd.DataFrame) and len(str(dir_path)) > 4:
Expand Down Expand Up @@ -160,70 +148,6 @@ def export_dataframe_to_local(
return dir_path, file_name, dest_path


def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str) -> None:
"""
Upload a file from the local file system to Google Cloud Storage (GCS).
Args:
local_path (str):
Local filesystem path of file to upload.
bucket (str):
GCS bucket destination to upload to.
remote_path (str):
Path within GCS bucket to upload file to, includes file name.
Returns:
None:
None
"""

storage_client = storage.Client(project=None)
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(remote_path)
blob.upload_from_filename(local_path)


def _get_files(bucket: str, uri: ParseResult) -> List[str]:
"""
List all available files within a Google storage bucket that matches a wild
card path.
Args:
bucket (str):
Google Storage bucket to reference.
uri (urllib.parse.ParseResult):
Wild card uri path containing the "*" character.
Example:
* gs://feast/staging_location/*
* gs://feast/staging_location/file_*.avro
Returns:
List[str]:
List of all available files matching the wildcard path.
"""

storage_client = storage.Client(project=None)
bucket = storage_client.get_bucket(bucket)
path = uri.path

if "*" in path:
regex = re.compile(path.replace("*", ".*?").strip("/"))
blob_list = bucket.list_blobs(
prefix=path.strip("/").split("*")[0], delimiter="/"
)
# File path should not be in path (file path must be longer than path)
return [
f"{uri.scheme}://{uri.hostname}/{file}"
for file in [x.name for x in blob_list]
if re.match(regex, file) and file not in path
]
else:
raise Exception(f"{path} is not a wildcard path")


def _get_file_name() -> str:
"""
Create a random file name.
Expand Down
Empty file.
162 changes: 162 additions & 0 deletions sdk/python/feast/staging/staging_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import re
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum
from tempfile import TemporaryFile
from typing import List
from urllib.parse import ParseResult, urlparse

import boto3
from google.cloud import storage


class PROTOCOL(Enum):
GS = "gs"
S3 = "s3"
LOCAL_FILE = "file"


class StagingStrategy:
def __init__(self):
self._protocol_dict = dict()

def execute_file_download(self, file_uri: ParseResult) -> TemporaryFile:
protocol = self._get_staging_protocol(file_uri.scheme)
return protocol.download_file(file_uri)

def execute_get_source_files(self, source: str) -> List[str]:
uri = urlparse(source)
if "*" in uri.path:
protocol = self._get_staging_protocol(uri.scheme)
return protocol.list_files(bucket=uri.hostname, uri=uri)
elif PROTOCOL(uri.scheme) in [PROTOCOL.S3, PROTOCOL.GS]:
return [source]
else:
raise Exception(
f"Could not identify file protocol {uri.scheme}. Only gs:// and file:// and s3:// supported"
)

def execute_file_upload(
self, scheme: str, local_path: str, bucket: str, remote_path: str
):
protocol = self._get_staging_protocol(scheme)
return protocol.upload_file(local_path, bucket, remote_path)

def _get_staging_protocol(self, protocol):
if protocol in self._protocol_dict:
return self._protocol_dict[protocol]
else:
if PROTOCOL(protocol) == PROTOCOL.GS:
self._protocol_dict[protocol] = GCSProtocol()
elif PROTOCOL(protocol) == PROTOCOL.S3:
self._protocol_dict[protocol] = S3Protocol()
elif PROTOCOL(protocol) == PROTOCOL.LOCAL_FILE:
self._protocol_dict[protocol] = LocalFSProtocol()
else:
raise Exception(
f"Could not identify file protocol {protocol}. Only gs:// and file:// and s3:// supported"
)
return self._protocol_dict[protocol]


class AbstractStagingProtocol(ABC):

__metaclass__ = ABCMeta

@abstractmethod
def __init__(self):
pass

@abstractmethod
def download_file(self, uri: ParseResult) -> TemporaryFile:
pass

@abstractmethod
def list_files(self, bucket: str, uri: ParseResult) -> List[str]:
pass

@abstractmethod
def upload_file(self, local_path: str, bucket: str, remote_path: str):
pass


class GCSProtocol(AbstractStagingProtocol):
def __init__(self):
self.gcs_client = storage.Client(project=None)

def download_file(self, uri: ParseResult) -> TemporaryFile:
url = uri.geturl()
file_obj = TemporaryFile()
self.gcs_client.download_blob_to_file(url, file_obj)
return file_obj

def list_files(self, bucket: str, uri: ParseResult) -> List[str]:
bucket = self.gcs_client.get_bucket(bucket)
path = uri.path

if "*" in path:
regex = re.compile(path.replace("*", ".*?").strip("/"))
blob_list = bucket.list_blobs(
prefix=path.strip("/").split("*")[0], delimiter="/"
)
# File path should not be in path (file path must be longer than path)
return [
f"{uri.scheme}://{uri.hostname}/{file}"
for file in [x.name for x in blob_list]
if re.match(regex, file) and file not in path
]
else:
raise Exception(f"{path} is not a wildcard path")

def upload_file(self, local_path: str, bucket: str, remote_path: str):
bucket = self.gcs_client.get_bucket(bucket)
blob = bucket.blob(remote_path)
blob.upload_from_filename(local_path)


class S3Protocol(AbstractStagingProtocol):
def __init__(self):
self.s3_client = boto3.client("s3")

def download_file(self, uri: ParseResult) -> TemporaryFile:
url = uri.path[1:] # removing leading / from the path
bucket = uri.hostname
file_obj = TemporaryFile()
self.s3_client.download_fileobj(bucket, url, file_obj)
return file_obj

def list_files(self, bucket: str, uri: ParseResult) -> List[str]:
path = uri.path

if "*" in path:
regex = re.compile(path.replace("*", ".*?").strip("/"))
blob_list = self.s3_client.list_objects(
Bucket=bucket, Prefix=path.strip("/").split("*")[0], Delimiter="/"
)
# File path should not be in path (file path must be longer than path)
return [
f"{uri.scheme}://{uri.hostname}/{file}"
for file in [x["Key"] for x in blob_list["Contents"]]
if re.match(regex, file) and file not in path
]
else:
raise Exception(f"{path} is not a wildcard path")

def upload_file(self, local_path: str, bucket: str, remote_path: str):
with open(local_path, "rb") as file:
self.s3_client.upload_fileobj(file, bucket, remote_path)


class LocalFSProtocol(AbstractStagingProtocol):
def __init__(self):
pass

def download_file(self, file_uri: ParseResult) -> TemporaryFile:
url = file_uri.path
file_obj = open(url, "rb")
return file_obj

def list_files(self, bucket: str, uri: ParseResult) -> List[str]:
raise NotImplementedError("list file not implemented for Local file")

def upload_file(self, local_path: str, bucket: str, remote_path: str):
pass # For test cases
3 changes: 2 additions & 1 deletion sdk/python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pytest-timeout
pytest-ordering==0.6.*
pandas==0.*
mock==2.0.0
pandavro==1.5.*
pandavro==1.5.*
moto
4 changes: 3 additions & 1 deletion sdk/python/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ mypy
mypy-protobuf
pre-commit
flake8
black
black
boto3
moto
Loading

0 comments on commit f1cc709

Please sign in to comment.