Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add native types for Python SDK online retrieval #826

Merged
merged 21 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from collections import OrderedDict
from math import ceil
from typing import Any, Dict, List, Optional, Tuple, Union
import warnings

import grpc
import pandas as pd
Expand Down Expand Up @@ -95,6 +96,8 @@

CPU_COUNT = os.cpu_count() # type: int

warnings.simplefilter("always", DeprecationWarning)


class Client:
"""
Expand Down Expand Up @@ -687,7 +690,10 @@ def get_online_features(
Each EntityRow provided will yield one record, which contains
data fields with data value and field status metadata (if included).
"""

warnings.warn(
woop marked this conversation as resolved.
Show resolved Hide resolved
"entity_rows parameter will only be accepting Dict format from v0.7 onwards",
DeprecationWarning,
)
try:
if entity_rows and isinstance(entity_rows[0], dict):
woop marked this conversation as resolved.
Show resolved Hide resolved
entity_rows = _infer_entity_rows(entity_rows)
Expand Down Expand Up @@ -1004,6 +1010,12 @@ def _get_grpc_metadata(self):
return ()


def _is_mixed_type(entity_list: List[Any]) -> bool:
iseq = iter(entity_list)
woop marked this conversation as resolved.
Show resolved Hide resolved
first_type = type(next(iseq))
return False if all((type(x) is first_type) for x in iseq) else True


def _infer_entity_rows(
entities: List[Dict[str, Any]]
) -> List[GetOnlineFeaturesRequest.EntityRow]:
Expand All @@ -1018,11 +1030,29 @@ def _infer_entity_rows(
A list of EntityRow protos parsed from args.
"""
entity_row_list = []
temp_dtype_storage = dict()
mrzzy marked this conversation as resolved.
Show resolved Hide resolved

is_mixed_type = False
for entity in entities:
for key, value in entity.items():
if isinstance(value, list):
is_mixed_type = _is_mixed_type(value)
# Infer the specific type for this row
current_dtype = python_type_to_feast_value_type(name=key, value=value)
woop marked this conversation as resolved.
Show resolved Hide resolved

if is_mixed_type:
woop marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError(
f"Input entity {key} of List type has mixed types and that is not allowed. "
)
if key not in temp_dtype_storage:
temp_dtype_storage[key] = current_dtype
else:
if current_dtype == temp_dtype_storage[key]:
woop marked this conversation as resolved.
Show resolved Hide resolved
pass
else:
raise TypeError(
f"Input entity {key} has mixed types and that is not allowed. "
)
proto_value = _python_value_to_proto_value(current_dtype, value)
entity_row_list.append(
GetOnlineFeaturesRequest.EntityRow(fields={key: proto_value})
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def to_dict(self) -> Dict[str, Any]:
Converts GetOnlineFeaturesResponse features into a dictionary form.
"""
features = [k for row in self.field_values for k, _ in row.fields.items()]
mrzzy marked this conversation as resolved.
Show resolved Hide resolved
features_dict = dict.fromkeys(features)
features_dict = {k: list() for k in features}

for row in self.field_values:
for feature in features_dict.keys():
native_type_value = feast_value_type_to_python_type(row.fields[feature])
features_dict[feature] = native_type_value
features_dict[feature].append(native_type_value)

return features_dict
13 changes: 12 additions & 1 deletion sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
for k, v in field_value_dict.items():
if k == "int64Val":
return int(v)
if k == "bytesVal":
return bytes(v)
if (k == "int64ListVal") or (k == "int32ListVal"):
return [int(item) for item in v["val"]]
if (k == "floatListVal") or (k == "doubleListVal"):
Expand All @@ -66,7 +68,13 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
if k == "boolListVal":
return [bool(item) for item in v["val"]]

return v
if k in ["int32Val", "floatVal", "doubleVal", "stringVal", "boolVal"]:
mrzzy marked this conversation as resolved.
Show resolved Hide resolved
return v
else:
raise TypeError(
f"Casting to Python native type for type {k} failed. "
f"Type {k} not found"
)


def python_type_to_feast_value_type(
Expand All @@ -87,6 +95,9 @@ def python_type_to_feast_value_type(
"""

type_name = type(value).__name__
if isinstance(value, list):
type_name = "ndarray"
value = np.asarray(value)
mrzzy marked this conversation as resolved.
Show resolved Hide resolved

type_map = {
"int": ValueType.INT64,
Expand Down
147 changes: 114 additions & 33 deletions tests/e2e/redis/basic-ingest-redis-serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,66 +750,147 @@ def list_type_dataframe():
{
"datetime": [time_offset] * N_ROWS,
"customer_id": [i for i in range(N_ROWS)],
"rating": [i for i in range(N_ROWS)],
"cost": [float(i)+0.5 for i in range(N_ROWS)],
"past_transactions_int": [[i,i+2] for i in range(N_ROWS)],
"past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)],
"past_transactions_bool": [[True,False] for _ in range(N_ROWS)]
"customer2_rating": [i for i in range(N_ROWS)],
"customer2_cost": [float(i)+0.5 for i in range(N_ROWS)],
"customer2_past_transactions_int": [[i,i+2] for i in range(N_ROWS)],
"customer2_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"customer2_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"customer2_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)],
"customer2_past_transactions_bool": [[True,False] for _ in range(N_ROWS)]
}
)
return customer_df

@pytest.fixture(scope='module')
def entity_list_type_dataframe():
N_ROWS = 2
time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
customer_df = pd.DataFrame(
{
"datetime": [time_offset] * N_ROWS,
"district_ids": [[np.int64(i),np.int64(i+1),np.int64(i+2)] for i in range(N_ROWS)],
"district_rating": [i for i in range(N_ROWS)],
"district_cost": [float(i)+0.5 for i in range(N_ROWS)],
"district_past_transactions_int": [[i,i+2] for i in range(N_ROWS)],
"district_past_transactions_double": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"district_past_transactions_float": [[float(i)+0.5,float(i)+2] for i in range(N_ROWS)],
"district_past_transactions_string": [['first_'+str(i),'second_'+str(i)] for i in range(N_ROWS)],
"district_past_transactions_bool": [[True,False] for _ in range(N_ROWS)]
}
)
return customer_df


@pytest.mark.timeout(600)
@pytest.mark.run(order=43)
def test_basic_retrieve_online_dict(client, list_type_dataframe):
def test_basic_retrieve_online_dict(client, list_type_dataframe, entity_list_type_dataframe):
# Case 1: Multiple entities check
customer_fs = FeatureSet(
name="customer2",
features=[
Feature(name="rating", dtype=ValueType.INT64, labels={"key1":"val1"}),
Feature(name="cost", dtype=ValueType.FLOAT),
Feature(name="past_transactions_int", dtype=ValueType.INT64_LIST),
Feature(name="past_transactions_double", dtype=ValueType.DOUBLE_LIST),
Feature(name="past_transactions_float", dtype=ValueType.FLOAT_LIST),
Feature(name="past_transactions_string", dtype=ValueType.STRING_LIST),
Feature(name="past_transactions_bool", dtype=ValueType.BOOL_LIST)
Feature(name="customer2_rating", dtype=ValueType.INT64, labels={"key1":"val1"}),
Feature(name="customer2_cost", dtype=ValueType.FLOAT),
Feature(name="customer2_past_transactions_int", dtype=ValueType.INT64_LIST),
Feature(name="customer2_past_transactions_double", dtype=ValueType.DOUBLE_LIST),
Feature(name="customer2_past_transactions_float", dtype=ValueType.FLOAT_LIST),
Feature(name="customer2_past_transactions_string", dtype=ValueType.STRING_LIST),
Feature(name="customer2_past_transactions_bool", dtype=ValueType.BOOL_LIST)
],
entities=[Entity("customer_id", ValueType.INT64)],
max_age=Duration(seconds=600)
max_age=Duration(seconds=3600)
)

client.set_project(PROJECT_NAME)
client.apply(customer_fs)

customer_fs = client.get_feature_set(name="customer2")
client.ingest(customer_fs, list_type_dataframe, timeout=300)
client.ingest(customer_fs, list_type_dataframe, timeout=600)
time.sleep(15)

online_request_entity = [{"customer_id": 1}]
online_request_entity = [{"customer_id": 0},{"customer_id": 1}]
online_request_features = [
"rating",
"cost",
"past_transactions_int",
"past_transactions_double",
"past_transactions_float",
"past_transactions_string",
"past_transactions_bool"
"customer2_rating",
"customer2_cost",
"customer2_past_transactions_int",
"customer2_past_transactions_double",
"customer2_past_transactions_float",
"customer2_past_transactions_string",
"customer2_past_transactions_bool"
]

online_features_actual = client.get_online_features(entity_rows=online_request_entity, feature_refs=online_request_features)
online_features_expected = {
"customer_id": 1,
"rating": 1,
"cost": 1.5,
"past_transactions_int": [1,3],
"past_transactions_double": [1.5,3.0],
"past_transactions_float": [1.5,3.0],
"past_transactions_string": ['first_1','second_1'],
"past_transactions_bool": [True,False]
"customer_id": [0,1],
"customer2_rating": [0,1],
"customer2_cost": [0.5,1.5],
"customer2_past_transactions_int": [[0,2],[1,3]],
"customer2_past_transactions_double": [[0.5,2.0],[1.5,3.0]],
"customer2_past_transactions_float": [[0.5,2.0],[1.5,3.0]],
"customer2_past_transactions_string": [['first_0','second_0'],['first_1','second_1']],
"customer2_past_transactions_bool": [[True,False],[True,False]]
}

# Case 2: List entity check
district_fs = FeatureSet(
name="district",
features=[
Feature(name="district_rating", dtype=ValueType.INT64, labels={"key1":"val1"}),
Feature(name="district_cost", dtype=ValueType.FLOAT),
Feature(name="district_past_transactions_int", dtype=ValueType.INT64_LIST),
Feature(name="district_past_transactions_double", dtype=ValueType.DOUBLE_LIST),
Feature(name="district_past_transactions_float", dtype=ValueType.FLOAT_LIST),
Feature(name="district_past_transactions_string", dtype=ValueType.STRING_LIST),
Feature(name="district_past_transactions_bool", dtype=ValueType.BOOL_LIST)
],
entities=[Entity("district_ids", dtype=ValueType.INT64_LIST)],
max_age=Duration(seconds=3600)
)

client.set_project(PROJECT_NAME)
client.apply(district_fs)

district_fs = client.get_feature_set(name="district")
client.ingest(district_fs, entity_list_type_dataframe, timeout=600)
time.sleep(15)

online_request_entity2 = [{"district_ids": [np.int64(1),np.int64(2),np.int64(3)]}]
online_request_features2 = [
"district_rating",
"district_cost",
"district_past_transactions_int",
"district_past_transactions_double",
"district_past_transactions_float",
"district_past_transactions_string",
"district_past_transactions_bool"
]

online_features_actual2 = client.get_online_features(entity_rows=online_request_entity2, feature_refs=online_request_features2)
online_features_expected2 = {
"district_ids": [[np.int64(1),np.int64(2),np.int64(3)]],
"district_rating": [1],
"district_cost": [1.5],
"district_past_transactions_int": [[1,3]],
"district_past_transactions_double": [[1.5,3.0]],
"district_past_transactions_float": [[1.5,3.0]],
"district_past_transactions_string": [['first_1','second_1']],
"district_past_transactions_bool": [[True,False]]
}

assert online_features_actual.to_dict() == online_features_expected
assert online_features_actual2.to_dict() == online_features_expected2

# Case 3: Entity check with mixed types
with pytest.raises(TypeError) as excinfo:
online_request_entity3 = [{"customer_id": 0},{"customer_id": "error_pls"}]
online_features_actual3 = client.get_online_features(entity_rows=online_request_entity3, feature_refs=online_request_features)

# Case 4: List entity check with mixed types
with pytest.raises(TypeError) as excinfo2:
online_request_entity4 = [{"district_ids": [np.int64(1),np.int64(2),True]}]
online_features_actual4 = client.get_online_features(entity_rows=online_request_entity4, feature_refs=online_request_features2)

assert "Input entity customer_id has mixed types and that is not allowed." in str(excinfo.value)
assert "Input entity district_ids of List type has mixed types and that is not allowed." in str(excinfo2.value)


@pytest.mark.timeout(900)
Expand Down