Skip to content

Commit

Permalink
feat: CLI command 'feast serve' should start go-based server if flag …
Browse files Browse the repository at this point in the history
…is enabled (#2617)

* CLI command 'feast serve' should start go-based server if flag is enabled

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Format stuff

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>

* Split grpc & http methods in python and sort imports correctly

Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze authored Apr 27, 2022
1 parent 134dc5f commit f3ff812
Show file tree
Hide file tree
Showing 21 changed files with 83 additions and 101 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
cd sdk/python && python setup.py build_go_protos

compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies
cd sdk/python && python setup.py build_go_lib
cd sdk/python && COMPILE_GO=True python setup.py build_ext --inplace

# Needs feast package to setup the feature store
test-go: compile-protos-go
Expand All @@ -178,7 +178,7 @@ format-go:
gofmt -s -w go/

lint-go: compile-protos-go
go vet ./go/internal/feast ./go/cmd/server
go vet ./go/internal/feast ./go/embedded

# Docker

Expand Down
File renamed without changes.
Binary file not shown.
74 changes: 0 additions & 74 deletions go/cmd/server/main.go

This file was deleted.

38 changes: 38 additions & 0 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ package embedded
import (
"context"
"fmt"
"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
"google.golang.org/grpc"
"log"
"net"
"os"
"os/signal"
"syscall"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
Expand Down Expand Up @@ -198,6 +206,36 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
return nil
}

func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
var loggingService *logging.LoggingService = nil
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
serving.RegisterServingServiceServer(grpcServer, ser)

// Notify this channel when receiving interrupt or termination signals from OS
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-c
fmt.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
}()

err = grpcServer.Serve(lis)
if err != nil {
return err
}
return nil
}

/*
Read Record Batch from memory managed by Python caller.
Python part uses C ABI interface to export this record into C Data Interface,
Expand Down
Empty file added go/internal/__init__.py
Empty file.
Empty file added go/internal/feast/__init__.py
Empty file.
8 changes: 5 additions & 3 deletions go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type OnlineStore interface {

func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) {
if onlineStoreType, ok := onlineStoreConfig["type"]; !ok {
return "", false
// If online store type isn't specified, default to sqlite
return "sqlite", true
} else {
result, ok := onlineStoreType.(string)
return result, ok
Expand All @@ -53,10 +54,11 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool)
func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
if !ok {
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
} else if onlineStoreType == "sqlite" {
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
}
if onlineStoreType == "redis" {
} else if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
return onlineStore, err
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package main
package server

import (
"context"

"github.com/feast-dev/feast/go/cmd/server/logging"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/google/uuid"
)

type servingServiceServer struct {
const feastServerVersion = "0.0.1"

type grpcServingServiceServer struct {
fs *feast.FeatureStore
loggingService *logging.LoggingService
serving.UnimplementedServingServiceServer
}

func newServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *servingServiceServer {
return &servingServiceServer{fs: fs, loggingService: loggingService}
func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer {
return &grpcServingServiceServer{fs: fs, loggingService: loggingService}
}

func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) {
func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) {
return &serving.GetFeastServingInfoResponse{
Version: feastServerVersion,
}, nil
Expand All @@ -30,7 +32,7 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request
// Returns an object containing the response to GetOnlineFeatures.
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
requestId := GenerateRequestId()
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())
if err != nil {
Expand Down Expand Up @@ -74,7 +76,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
EventTimestamps: vector.Timestamps,
})
}
if featuresOrService.FeatureService != nil {
if featuresOrService.FeatureService != nil && s.loggingService != nil {
go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId)
}
return resp, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package server

import (
"context"
Expand All @@ -16,8 +16,8 @@ import (
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/apache/arrow/go/v8/parquet/file"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/cmd/server/logging"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/internal/test"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
Expand Down Expand Up @@ -73,7 +73,7 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en
if err != nil {
panic(err)
}
servingServiceServer := newServingServiceServer(fs, loggingService)
servingServiceServer := NewGrpcServingServiceServer(fs, loggingService)

serving.RegisterServingServiceServer(server, servingServiceServer)
go func() {
Expand Down
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def get_online_features(
resp = record_batch_to_online_response(record_batch)
return OnlineResponse(resp)

def start_grpc_server(self, host: str, port: int):
self._service.StartGprcServer(host, port)


def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array:
if isinstance(value, Value_pb2.RepeatedValue):
Expand Down
33 changes: 22 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class FeatureStore:
repo_path: Path
_registry: Registry
_provider: Provider
_go_server: Optional["EmbeddedOnlineFeatureServer"]
_go_server: "EmbeddedOnlineFeatureServer"

@log_exceptions
def __init__(
Expand Down Expand Up @@ -1306,6 +1306,18 @@ def get_online_features(
native_entity_values=True,
)

def _lazy_init_go_server(self):
"""Lazily initialize self._go_server if it hasn't been initialized before."""
from feast.embedded_go.online_features_service import (
EmbeddedOnlineFeatureServer,
)

# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = EmbeddedOnlineFeatureServer(
str(self.repo_path.absolute()), self.config, self
)

def _get_online_features(
self,
features: Union[List[str], FeatureService],
Expand All @@ -1323,15 +1335,7 @@ def _get_online_features(

# If Go feature server is enabled, send request to it instead of going through regular Python logic
if self.config.go_feature_retrieval:
from feast.embedded_go.online_features_service import (
EmbeddedOnlineFeatureServer,
)

# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = EmbeddedOnlineFeatureServer(
str(self.repo_path.absolute()), self.config, self
)
self._lazy_init_go_server()

entity_native_values: Dict[str, List[Any]]
if not native_entity_values:
Expand Down Expand Up @@ -1957,7 +1961,14 @@ def _get_feature_views_to_use(
@log_exceptions_and_usage
def serve(self, host: str, port: int, no_access_log: bool) -> None:
"""Start the feature consumption server locally on a given port."""
feature_server.start_server(self, host, port, no_access_log)
if self.config.go_feature_retrieval:
# Start go server instead of python if the flag is enabled
self._lazy_init_go_server()
# TODO(tsotne) add http/grpc flag in CLI and call appropriate method here depending on that
self._go_server.start_grpc_server(host, port)
else:
# Start the python server if go server isn't enabled
feature_server.start_server(self, host, port, no_access_log)

@log_exceptions_and_usage
def get_feature_server_endpoint(self) -> Optional[str]:
Expand Down

0 comments on commit f3ff812

Please sign in to comment.