Skip to content

Commit

Permalink
Merge pull request feast-dev#2 from leonid133/feature/registry_s3
Browse files Browse the repository at this point in the history
Feature/registry s3
  • Loading branch information
leonid133 authored Apr 20, 2021
2 parents 4ef14a9 + b012a07 commit 575f145
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 10 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def materialize_incremental_command(end_ts: str, views: List[str]):
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws_dynamo"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
7 changes: 4 additions & 3 deletions sdk/python/feast/infra/aws_dynamo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
class AwsDynamoProvider(Provider):
_aws_project_id: Optional[str]

def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
if config:
self._aws_project_id = config.project_id
def __init__(self, config: Optional[RepoConfig]):
if config and config.online_store and config.online_store.project_id:
self._aws_project_id = config.online_store.project_id
else:
self._aws_project_id = None

Expand All @@ -48,6 +48,7 @@ def update_infra(
client = self._initialize_client()

for table_name in tables_to_keep:
# TODO: add table creation to dynamo.
table = client.Table(table_name.name)
table.update_item(
Key={
Expand Down
77 changes: 77 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def __init__(self, registry_path: str, cache_ttl: timedelta):
uri = urlparse(registry_path)
if uri.scheme == "gs":
self._registry_store: RegistryStore = GCSRegistryStore(registry_path)
elif uri.scheme == "s3":
self._registry_store: RegistryStore = AwsS3RegistryStore(registry_path)
elif uri.scheme == "file" or uri.scheme == "":
self._registry_store = LocalRegistryStore(registry_path)
else:
Expand Down Expand Up @@ -477,3 +479,78 @@ def _write_registry(self, registry_proto: RegistryProto):
file_obj.seek(0)
blob.upload_from_file(file_obj)
return


class AwsS3RegistryStore(RegistryStore):
def __init__(self, uri: str):
try:
import boto3
except ImportError:
raise ImportError(
"Install package boto3==1.17.* for gcs support"
"run ```pip install boto3==1.17.*```"
)
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._key = self._uri.path.lstrip("/")
return

def get_registry_proto(self):
import boto3
import botocore

file_obj = TemporaryFile()
registry_proto = RegistryProto()
s3 = boto3.resource('s3')
try:
bucket = s3.Bucket(self._bucket)
s3.meta.client.head_bucket(Bucket=bucket.name)
except botocore.client.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the bucket does not exist.
error_code = int(e.response['Error']['Code'])
if error_code == 404:
raise Exception(
f"No bucket named {self._bucket} exists; please create it first."
)
else:
raise Exception(f'Private Registry Bucket {self._bucket}. Forbidden Access!')

try:
obj = bucket.Object(self._key)
obj.download_fileobj(file_obj)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
raise FileNotFoundError(
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)
else:
raise FileNotFoundError(
f'Registry is not able to locate data under path "{self._uri.geturl()}" with [original error]: {e.response}'
)

def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]):
try:
registry_proto = self.get_registry_proto()
except FileNotFoundError:
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
registry_proto = updater(registry_proto)
self._write_registry(registry_proto)
return

def _write_registry(self, registry_proto: RegistryProto):
import boto3
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
# we have already checked the bucket exists so no need to do it again
registry_bucket = boto3.resource('s3').Bucket(self._bucket)
registry_db = registry_bucket.Object(self._key)
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
registry_db.upload_fileobj(file_obj)
return
19 changes: 13 additions & 6 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ class DatastoreOnlineStoreConfig(FeastBaseModel):
project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

class DynamoOnlineStoreConfig(FeastBaseModel):
"""Online store config for DynamoDB store"""
type: Literal["dynamo"] = "dynamo"
"""Online store type selector"""
project_id: Optional[StrictStr] = None

OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig]
OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig, DynamoOnlineStoreConfig]


class RegistryConfig(FeastBaseModel):
Expand Down Expand Up @@ -63,7 +68,7 @@ class RepoConfig(FeastBaseModel):
"""

provider: StrictStr
""" str: local or gcp """
""" str: local or gcp or aws_dynamo """

online_store: OnlineStoreConfig = SqliteOnlineStoreConfig()
""" OnlineStoreConfig: Online store configuration (optional depending on provider) """
Expand Down Expand Up @@ -102,18 +107,20 @@ def _validate_online_store_config(cls, values):
values["online_store"]["type"] = "datastore"
elif values["provider"] == "aws_dynamo":
values["online_store"]["type"] = "datastore"

elif values["provider"] == "aws_dynamo":
values["online_store"]["type"] = "dynamo"
online_store_type = values["online_store"]["type"]

# Make sure the user hasn't provided the wrong type
assert online_store_type in ["datastore", "sqlite"]
assert online_store_type in ["datastore", "sqlite", "dynamo"]

# Validate the dict to ensure one of the union types match
try:
if online_store_type == "sqlite":
SqliteOnlineStoreConfig(**values["online_store"])
elif values["online_store"]["type"] == "datastore":
elif online_store_type == "datastore":
DatastoreOnlineStoreConfig(**values["online_store"])
elif online_store_type == "dynamo":
DynamoOnlineStoreConfig(**values["online_store"])
else:
raise ValidationError(
f"Invalid online store type {online_store_type}"
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/templates/aws_dynamo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
project: my_project
registry: s3://feast-provectus-demo/data/registry.db
provider: aws_dynamo
online_store:

0 comments on commit 575f145

Please sign in to comment.