Skip to content

Commit

Permalink
Add e2e http server test & fix logging bug
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze committed May 19, 2022
1 parent ad301f3 commit 8168517
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
})
}

err = logger.Log(entitiesProto, featureVectorProtos, featureNames, requestContextProto, requestId)
err = logger.Log(entitiesProto, featureVectorProtos, featureNames[len(request.Entities):], requestContextProto, requestId)
if err != nil {
http.Error(w, fmt.Sprintf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError)
return
Expand Down
60 changes: 57 additions & 3 deletions sdk/python/tests/integration/e2e/test_go_feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pytest
import pytz
import requests

from feast import FeatureService, ValueType
from feast.embedded_go.lib.embedded import LoggingOptions
Expand Down Expand Up @@ -61,8 +62,7 @@ def initialized_registry(environment, universal_data_sources):
fs.materialize(environment.start_date, environment.end_date)


@pytest.fixture
def grpc_server_port(environment, initialized_registry):
def server_port(environment, server_type: str):
if not environment.test_repo_config.go_feature_retrieval:
pytest.skip("Only for Go path")

Expand All @@ -72,9 +72,15 @@ def grpc_server_port(environment, initialized_registry):
repo_path=str(fs.repo_path.absolute()), repo_config=fs.config, feature_store=fs,
)
port = free_port()
if server_type == "grpc":
target = embedded.start_grpc_server
elif server_type == "http":
target = embedded.start_http_server
else:
raise ValueError("Server Type must be either 'http' or 'grpc'")

t = threading.Thread(
target=embedded.start_grpc_server,
target=target,
args=("127.0.0.1", port),
kwargs=dict(
enable_logging=True,
Expand All @@ -98,6 +104,16 @@ def grpc_server_port(environment, initialized_registry):
time.sleep(2)


@pytest.fixture
def grpc_server_port(environment, initialized_registry):
yield from server_port(environment, "grpc")


@pytest.fixture
def http_server_port(environment, initialized_registry):
yield from server_port(environment, "http")


@pytest.fixture
def grpc_client(grpc_server_port):
ch = grpc.insecure_channel(f"localhost:{grpc_server_port}")
Expand Down Expand Up @@ -130,6 +146,44 @@ def test_go_grpc_server(grpc_client):
assert all([s == FieldStatus.PRESENT for s in vector.statuses])


@pytest.mark.integration
@pytest.mark.goserver
def test_go_http_server(http_server_port):
response = requests.post(
f"http://localhost:{http_server_port}/get-online-features",
json={
"feature_service": "driver_features",
"entities": {"driver_id": [5001, 5002]},
"full_feature_names": True,
},
)
assert response.status_code == 200, response.text
response = response.json()
assert set(response.keys()) == {"metadata", "results"}
metadata = response["metadata"]
results = response["results"]
assert response["metadata"] == {
"feature_names": [
"driver_id",
"driver_stats__conv_rate",
"driver_stats__acc_rate",
"driver_stats__avg_daily_trips",
]
}, metadata
assert len(results) == 4, results
assert all(
set(result.keys()) == {"event_timestamps", "statuses", "values"}
for result in results
), results
assert all(
result["statuses"] == ["PRESENT", "PRESENT"] for result in results
), results
assert results[0]["values"] == [5001, 5002], results
for result in results[1:]:
assert len(result["values"]) == 2, result
assert all(value is not None for value in result["values"]), result


@pytest.mark.integration
@pytest.mark.goserver
@pytest.mark.universal_offline_stores
Expand Down

0 comments on commit 8168517

Please sign in to comment.