Skip to content

Commit

Permalink
Implement half of JobService functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com>
  • Loading branch information
tsotnet committed Oct 29, 2020
1 parent 4396d0f commit 221badc
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ spec:
{{- end }}

env:
- name: FEAST_CORE_URL
value: "{{ .Release.Name }}-feast-core:6565"
- name: FEAST_HISTORICAL_SERVING_URL
value: "{{ .Release.Name }}-feast-batch-serving:6566"

{{- if .Values.gcpServiceAccount.enabled }}
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/secrets/google/{{ .Values.gcpServiceAccount.existingSecret.key }}
Expand Down
4 changes: 4 additions & 0 deletions infra/charts/feast/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ feast-jupyter:
# feast-jupyter.enabled -- Flag to install Feast Jupyter Notebook with SDK
enabled: true

feast-jobservice:
# feast-jobservice.enabled -- Flag to install Feast Job Service
enabled: true

postgresql:
# postgresql.enabled -- Flag to install Postgresql
enabled: true
Expand Down
8 changes: 7 additions & 1 deletion infra/docker/jobservice/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7-slim-buster
FROM jupyter/pyspark-notebook:ae5f7e104dd5

USER root
WORKDIR /feast
Expand Down Expand Up @@ -27,4 +27,10 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa
-O /usr/bin/grpc-health-probe && \
chmod +x /usr/bin/grpc-health-probe

ENV FEAST_SPARK_LAUNCHER standalone
ENV FEAST_SPARK_STANDALONE_MASTER "local[*]"
ENV FEAST_SPARK_HOME $SPARK_HOME
ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"

CMD ["feast", "server"]
4 changes: 2 additions & 2 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ message StartOfflineToOnlineIngestionJobResponse {
}

message GetHistoricalFeaturesRequest {
// List of features that are being retrieved
repeated feast.serving.FeatureReferenceV2 features = 1;
// List of feature references that are being retrieved
repeated string feature_refs = 1;

// Batch DataSource that can be used to obtain entity values for historical retrieval.
// For each entity value, a feature value will be retrieved for that value/timestamp
Expand Down
92 changes: 66 additions & 26 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
CONFIG_ENABLE_AUTH_KEY,
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY,
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY,
CONFIG_JOB_SERVICE_ENABLED,
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY,
CONFIG_JOB_SERVICE_URL_KEY,
CONFIG_PROJECT_KEY,
Expand Down Expand Up @@ -67,6 +68,11 @@
)
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.JobService_pb2_grpc import JobServiceStub
from feast.core.JobService_pb2 import (
GetHistoricalFeaturesRequest,
StartOfflineToOnlineIngestionJobRequest,
StartStreamToOnlineIngestionJobRequest,
)
from feast.data_format import ParquetFormat
from feast.data_source import BigQuerySource, FileSource
from feast.entity import Entity
Expand Down Expand Up @@ -190,6 +196,10 @@ def _job_service(self):
Returns: JobServiceStub
"""
# Don't initialize job service stub if the job service is disabled
if self._config.get(CONFIG_JOB_SERVICE_ENABLED) == "False":
return None

if not self._job_service_stub:
channel = create_grpc_channel(
url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY),
Expand Down Expand Up @@ -853,8 +863,9 @@ def get_historical_features(
self,
feature_refs: List[str],
entity_source: Union[pd.DataFrame, FileSource, BigQuerySource],
project: str = None,
) -> RetrievalJob:
project: Optional[str] = None,
destination_path: Optional[str] = None,
) -> Union[RetrievalJob, str]:
"""
Launch a historical feature retrieval job.
Expand All @@ -873,11 +884,12 @@ def get_historical_features(
retrieval job.
project: Specifies the project that contains the feature tables
which the requested features belong to.
destination_path: Specifies the path in a bucket to write the exported feature data files
Returns:
Returns a retrieval job object that can be used to monitor retrieval
progress asynchronously, and can be used to materialize the
results.
If jobs are launched locally, returns a retrieval job object that can be used to monitor retrieval
progress asynchronously, and can be used to materialize the results.
Otherwise, if jobs are launched through Feast Job Service, returns a job id.
Examples:
>>> from feast import Client
Expand All @@ -890,15 +902,6 @@ def get_historical_features(
>>> output_file_uri = feature_retrieval_job.get_output_file_uri()
"gs://some-bucket/output/
"""
feature_tables = self._get_feature_tables_from_feature_refs(
feature_refs, project
)
output_location = os.path.join(
self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION),
str(uuid.uuid4()),
)
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)

if isinstance(entity_source, pd.DataFrame):
staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION)
entity_staging_uri = urlparse(
Expand All @@ -922,13 +925,29 @@ def get_historical_features(
entity_staging_uri.geturl(),
)

return start_historical_feature_retrieval_job(
self,
entity_source,
feature_tables,
output_format,
os.path.join(output_location, str(uuid.uuid4())),
)
if destination_path is None:
destination_path = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION)
destination_path = os.path.join(destination_path, str(uuid.uuid4()))

if not self._job_service:
feature_tables = self._get_feature_tables_from_feature_refs(
feature_refs, project
)
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)


return start_historical_feature_retrieval_job(
self, entity_source, feature_tables, output_format, destination_path
)
else:
request = GetHistoricalFeaturesRequest(
feature_refs=feature_refs,
entities_source=entity_source.to_proto(),
project=project,
destination_path=destination_path,
)
response = self._job_service.GetHistoricalFeatures(request)
return response.id

def get_historical_features_df(
self,
Expand Down Expand Up @@ -993,22 +1012,43 @@ def _get_feature_tables_from_feature_refs(

def start_offline_to_online_ingestion(
self, feature_table: FeatureTable, start: datetime, end: datetime,
) -> SparkJob:
) -> Union[SparkJob, str]:
"""
Launch Ingestion Job from Batch Source to Online Store for given featureTable
:param feature_table: FeatureTable which will be ingested
:param start: lower datetime boundary
:param end: upper datetime boundary
:return: Spark Job Proxy object
:return: Spark Job Proxy object if jobs are launched locally,
or Spark Job ID if jobs are launched through Feast Job Service
"""
return start_offline_to_online_ingestion(feature_table, start, end, self)
if not self._job_service:
return start_offline_to_online_ingestion(feature_table, start, end, self)
else:
request = StartOfflineToOnlineIngestionJobRequest(
project=self.project,
table_name=feature_table.name,
)
request.start_date.FromDatetime(start)
request.end_date.FromDatetime(end)
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
return response.id

def start_stream_to_online_ingestion(
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
) -> SparkJob:
return start_stream_to_online_ingestion(feature_table, extra_jars or [], self)
) -> Union[SparkJob, str]:
if not self._job_service:
return start_stream_to_online_ingestion(
feature_table, extra_jars or [], self
)
else:
request = StartStreamToOnlineIngestionJobRequest(
project=self.project,
table_name=feature_table.name,
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return response.id

def stage_dataframe(
self,
Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class AuthProvider(Enum):
CONFIG_JOB_SERVICE_URL_KEY = "job_service_url"
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl"
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert"
CONFIG_JOB_SERVICE_ENABLED = "job_service_enabled"
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default"
CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply"
CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = (
Expand Down Expand Up @@ -115,7 +116,15 @@ class AuthProvider(Enum):
CONFIG_SERVING_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Serving
CONFIG_SERVING_SERVER_SSL_CERT_KEY: "",
# Default connection timeout to Feast Serving and Feast Core (in seconds)
# Default Feast Job Service URL
CONFIG_JOB_SERVICE_URL_KEY: "localhost:6568",
# Enable or disable TLS/SSL to Feast Job Service
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
# Enable or disable Feast Job Service
CONFIG_JOB_SERVICE_ENABLED: "False",
# Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds)
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3",
# Default gRPC connection timeout when sending an ApplyFeatureSet command to
# Feast Core (in seconds)
Expand Down
59 changes: 50 additions & 9 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,27 @@
import grpc

import feast
from feast.constants import (
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT,
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION,
)
from feast.core import JobService_pb2_grpc
from feast.core.JobService_pb2 import (
GetHistoricalFeaturesResponse,
GetJobResponse,
ListJobsResponse,
StartOfflineToOnlineIngestionJobResponse,
StartStreamToOnlineIngestionJobResponse,
StopJobResponse,
)
from feast.data_source import DataSource
from feast.pyspark.launcher import (
stage_dataframe,
start_historical_feature_retrieval_job,
start_historical_feature_retrieval_spark_session,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
)
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
HealthCheckResponse,
Expand All @@ -17,21 +37,42 @@ def __init__(self):

def StartOfflineToOnlineIngestionJob(self, request, context):
"""Start job to ingest data from offline store into online store"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = start_offline_to_online_ingestion(
feature_table,
request.start_date.ToDatetime(),
request.end_date.ToDatetime(),
self.client,
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

def GetHistoricalFeatures(self, request, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
feature_tables = self.client._get_feature_tables_from_feature_refs(
request.feature_refs, request.project
)
output_format = self.client._config.get(
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT
)

job = start_historical_feature_retrieval_job(
self.client,
DataSource.from_proto(request.entities_source),
feature_tables,
output_format,
request.destination_path,
)
return GetHistoricalFeaturesResponse(id=job.get_id())

def StartStreamToOnlineIngestionJob(self, request, context):
"""Start job to ingest data from stream into online store"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = start_stream_to_online_ingestion(feature_table, [], self.client)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

def ListJobs(self, request, context):
"""List all types of jobs"""
Expand Down

0 comments on commit 221badc

Please sign in to comment.