From 221badc95517c7b7ddb45b65b3f07a62581df661 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Thu, 29 Oct 2020 09:54:35 +0400 Subject: [PATCH] Implement half of JobService functionality Signed-off-by: Tsotne Tabidze --- .../templates/deployment.yaml | 5 + infra/charts/feast/values.yaml | 4 + infra/docker/jobservice/Dockerfile | 8 +- protos/feast/core/JobService.proto | 4 +- sdk/python/feast/client.py | 92 +++++++++++++------ sdk/python/feast/constants.py | 11 ++- sdk/python/feast/job_service.py | 59 ++++++++++-- 7 files changed, 144 insertions(+), 39 deletions(-) diff --git a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml index 5a3429c5f1..9ffd402363 100644 --- a/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml +++ b/infra/charts/feast/charts/feast-jobservice/templates/deployment.yaml @@ -57,6 +57,11 @@ spec: {{- end }} env: + - name: FEAST_CORE_URL + value: "{{ .Release.Name }}-feast-core:6565" + - name: FEAST_HISTORICAL_SERVING_URL + value: "{{ .Release.Name }}-feast-batch-serving:6566" + {{- if .Values.gcpServiceAccount.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }} diff --git a/infra/charts/feast/values.yaml b/infra/charts/feast/values.yaml index c3658f88a2..c06f44c713 100644 --- a/infra/charts/feast/values.yaml +++ b/infra/charts/feast/values.yaml @@ -13,6 +13,10 @@ feast-jupyter: # feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK enabled: true +feast-jobservice: + # feast-jobservice.enabled -- Flag to install Feast Job Service + enabled: true + postgresql: # postgresql.enabled -- Flag to install Postgresql enabled: true diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 29a3d97f90..4eb000441b 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.7-slim-buster +FROM jupyter/pyspark-notebook:ae5f7e104dd5 USER root WORKDIR /feast @@ -27,4 +27,10 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa -O /usr/bin/grpc-health-probe && \ chmod +x /usr/bin/grpc-health-probe +ENV FEAST_SPARK_LAUNCHER standalone +ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" +ENV FEAST_SPARK_HOME $SPARK_HOME +ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ +--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" + CMD ["feast", "server"] diff --git a/protos/feast/core/JobService.proto b/protos/feast/core/JobService.proto index 861f3b74a5..3ec6aea60c 100644 --- a/protos/feast/core/JobService.proto +++ b/protos/feast/core/JobService.proto @@ -126,8 +126,8 @@ message StartOfflineToOnlineIngestionJobResponse { } message GetHistoricalFeaturesRequest { - // List of features that are being retrieved - repeated feast.serving.FeatureReferenceV2 features = 1; + // List of feature references that are being retrieved + repeated string feature_refs = 1; // Batch DataSource that can be used to obtain entity values for historical retrieval. // For each entity value, a feature value will be retrieved for that value/timestamp diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 9355539067..7867331dc3 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -33,6 +33,7 @@ CONFIG_ENABLE_AUTH_KEY, CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, + CONFIG_JOB_SERVICE_ENABLED, CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY, CONFIG_JOB_SERVICE_URL_KEY, CONFIG_PROJECT_KEY, @@ -67,6 +68,11 @@ ) from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.JobService_pb2_grpc import JobServiceStub +from feast.core.JobService_pb2 import ( + GetHistoricalFeaturesRequest, + StartOfflineToOnlineIngestionJobRequest, + StartStreamToOnlineIngestionJobRequest, +) from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity @@ -190,6 +196,10 @@ def _job_service(self): Returns: JobServiceStub """ + # Don't initialize job service stub if the job service is disabled + if self._config.get(CONFIG_JOB_SERVICE_ENABLED) == "False": + return None + if not self._job_service_stub: channel = create_grpc_channel( url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY), @@ -853,8 +863,9 @@ def get_historical_features( self, feature_refs: List[str], entity_source: Union[pd.DataFrame, FileSource, BigQuerySource], - project: str = None, - ) -> RetrievalJob: + project: Optional[str] = None, + destination_path: Optional[str] = None, + ) -> Union[RetrievalJob, str]: """ Launch a historical feature retrieval job. @@ -873,11 +884,12 @@ def get_historical_features( retrieval job. project: Specifies the project that contains the feature tables which the requested features belong to. + destination_path: Specifies the path in a bucket to write the exported feature data files Returns: - Returns a retrieval job object that can be used to monitor retrieval - progress asynchronously, and can be used to materialize the - results. + If jobs are launched locally, returns a retrieval job object that can be used to monitor retrieval + progress asynchronously, and can be used to materialize the results. + Otherwise, if jobs are launched through Feast Job Service, returns a job id. Examples: >>> from feast import Client @@ -890,15 +902,6 @@ def get_historical_features( >>> output_file_uri = feature_retrieval_job.get_output_file_uri() "gs://some-bucket/output/ """ - feature_tables = self._get_feature_tables_from_feature_refs( - feature_refs, project - ) - output_location = os.path.join( - self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION), - str(uuid.uuid4()), - ) - output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) - if isinstance(entity_source, pd.DataFrame): staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION) entity_staging_uri = urlparse( @@ -922,13 +925,29 @@ def get_historical_features( entity_staging_uri.geturl(), ) - return start_historical_feature_retrieval_job( - self, - entity_source, - feature_tables, - output_format, - os.path.join(output_location, str(uuid.uuid4())), - ) + if destination_path is None: + destination_path = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION) + destination_path = os.path.join(destination_path, str(uuid.uuid4())) + + if not self._job_service: + feature_tables = self._get_feature_tables_from_feature_refs( + feature_refs, project + ) + output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) + + + return start_historical_feature_retrieval_job( + self, entity_source, feature_tables, output_format, destination_path + ) + else: + request = GetHistoricalFeaturesRequest( + feature_refs=feature_refs, + entities_source=entity_source.to_proto(), + project=project, + destination_path=destination_path, + ) + response = self._job_service.GetHistoricalFeatures(request) + return response.id def get_historical_features_df( self, @@ -993,7 +1012,7 @@ def _get_feature_tables_from_feature_refs( def start_offline_to_online_ingestion( self, feature_table: FeatureTable, start: datetime, end: datetime, - ) -> SparkJob: + ) -> Union[SparkJob, str]: """ Launch Ingestion Job from Batch Source to Online Store for given featureTable @@ -1001,14 +1020,35 @@ def start_offline_to_online_ingestion( :param feature_table: FeatureTable which will be ingested :param start: lower datetime boundary :param end: upper datetime boundary - :return: Spark Job Proxy object + :return: Spark Job Proxy object if jobs are launched locally, + or Spark Job ID if jobs are launched through Feast Job Service """ - return start_offline_to_online_ingestion(feature_table, start, end, self) + if not self._job_service: + return start_offline_to_online_ingestion(feature_table, start, end, self) + else: + request = StartOfflineToOnlineIngestionJobRequest( + project=self.project, + table_name=feature_table.name, + ) + request.start_date.FromDatetime(start) + request.end_date.FromDatetime(end) + response = self._job_service.StartOfflineToOnlineIngestionJob(request) + return response.id def start_stream_to_online_ingestion( self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None, - ) -> SparkJob: - return start_stream_to_online_ingestion(feature_table, extra_jars or [], self) + ) -> Union[SparkJob, str]: + if not self._job_service: + return start_stream_to_online_ingestion( + feature_table, extra_jars or [], self + ) + else: + request = StartStreamToOnlineIngestionJobRequest( + project=self.project, + table_name=feature_table.name, + ) + response = self._job_service.StartStreamToOnlineIngestionJob(request) + return response.id def stage_dataframe( self, diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index c62ca88d77..ae3e49e83e 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -52,6 +52,7 @@ class AuthProvider(Enum): CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" +CONFIG_JOB_SERVICE_ENABLED = "job_service_enabled" CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( @@ -115,7 +116,15 @@ class AuthProvider(Enum): CONFIG_SERVING_ENABLE_SSL_KEY: "False", # Path to certificate(s) to secure connection to Feast Serving CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", - # Default connection timeout to Feast Serving and Feast Core (in seconds) + # Default Feast Job Service URL + CONFIG_JOB_SERVICE_URL_KEY: "localhost:6568", + # Enable or disable TLS/SSL to Feast Job Service + CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", + # Path to certificate(s) to secure connection to Feast Job Service + CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", + # Enable or disable Feast Job Service + CONFIG_JOB_SERVICE_ENABLED: "False", + # Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3", # Default gRPC connection timeout when sending an ApplyFeatureSet command to # Feast Core (in seconds) diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 53acb63066..c0f56489b9 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -3,7 +3,27 @@ import grpc import feast +from feast.constants import ( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT, + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION, +) from feast.core import JobService_pb2_grpc +from feast.core.JobService_pb2 import ( + GetHistoricalFeaturesResponse, + GetJobResponse, + ListJobsResponse, + StartOfflineToOnlineIngestionJobResponse, + StartStreamToOnlineIngestionJobResponse, + StopJobResponse, +) +from feast.data_source import DataSource +from feast.pyspark.launcher import ( + stage_dataframe, + start_historical_feature_retrieval_job, + start_historical_feature_retrieval_spark_session, + start_offline_to_online_ingestion, + start_stream_to_online_ingestion, +) from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc from feast.third_party.grpc.health.v1.HealthService_pb2 import ( HealthCheckResponse, @@ -17,21 +37,42 @@ def __init__(self): def StartOfflineToOnlineIngestionJob(self, request, context): """Start job to ingest data from offline store into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + job = start_offline_to_online_ingestion( + feature_table, + request.start_date.ToDatetime(), + request.end_date.ToDatetime(), + self.client, + ) + return StartOfflineToOnlineIngestionJobResponse(id=job.get_id()) def GetHistoricalFeatures(self, request, context): """Produce a training dataset, return a job id that will provide a file reference""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_tables = self.client._get_feature_tables_from_feature_refs( + request.feature_refs, request.project + ) + output_format = self.client._config.get( + CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT + ) + + job = start_historical_feature_retrieval_job( + self.client, + DataSource.from_proto(request.entities_source), + feature_tables, + output_format, + request.destination_path, + ) + return GetHistoricalFeaturesResponse(id=job.get_id()) def StartStreamToOnlineIngestionJob(self, request, context): """Start job to ingest data from stream into online store""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") + feature_table = self.client.get_feature_table( + request.table_name, request.project + ) + job = start_stream_to_online_ingestion(feature_table, [], self.client) + return StartStreamToOnlineIngestionJobResponse(id=job.get_id()) def ListJobs(self, request, context): """List all types of jobs"""