Skip to content

Commit

Permalink
Use dictionary instead of class to avoid mandatory pyspark dependenci…
Browse files Browse the repository at this point in the history
…es for Feast SDK

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng committed Oct 12, 2020
1 parent f29f326 commit 7c5025d
Showing 1 changed file with 39 additions and 55 deletions.
94 changes: 39 additions & 55 deletions sdk/python/feast/pyspark/launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@
import json
import os
import subprocess
from typing import List
from typing import Dict, List
from urllib.parse import urlparse

from feast.pyspark.historical_feature_retrieval_job import (
EntitySource,
FeatureTable,
FeatureTableSource,
FileDestination,
)


class SparkJobFailure(Exception):
"""
Expand Down Expand Up @@ -68,12 +61,9 @@ def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str)
This is the returned historical feature retrieval job result for StandaloneClusterLauncher.
Args:
job_id (str):
Historical feature retrieval job id.
process (subprocess.Popen):
Pyspark driver process, spawned by the launcher.
output_file_uri (str):
Uri to the historical feature retrieval job output file.
job_id (str): Historical feature retrieval job id.
process (subprocess.Popen): Pyspark driver process, spawned by the launcher.
output_file_uri (str): Uri to the historical feature retrieval job output file.
"""
self.job_id = job_id
self._process = process
Expand Down Expand Up @@ -108,12 +98,10 @@ def __init__(self, job_id, operation, output_file_uri):
This is the returned historical feature retrieval job result for DataprocClusterLauncher.
Args:
job_id (str):
Historical feature retrieval job id.
operation (google.api.core.operation.Operation):
A Future for the spark job result, returned by the dataproc client.
output_file_uri (str):
Uri to the historical feature retrieval job output file.
job_id (str): Historical feature retrieval job id.
operation (google.api.core.operation.Operation): A Future for the spark job result,
returned by the dataproc client.
output_file_uri (str): Uri to the historical feature retrieval job output file.
"""
self.job_id = job_id
self._operation = operation
Expand All @@ -135,35 +123,29 @@ class JobLauncher(abc.ABC):
Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs.
"""

# entity_source: Source,
# feature_tables_sources: List[Source],
# feature_tables: List[FeatureTable],
# destination

@abc.abstractmethod
def historical_feature_retrieval(
self,
pyspark_script: str,
entity_source: EntitySource,
feature_tables_sources: List[FeatureTableSource],
feature_tables: List[FeatureTable],
destination: FileDestination,
entity_source_conf: Dict,
feature_tables_sources_conf: List[Dict],
feature_tables_conf: List[Dict],
destination_conf: Dict,
job_id: str,
**kwargs,
) -> RetrievalJob:
"""
Submits a historical feature retrieval job to a Spark cluster.
Args:
pyspark_script (str):
Local file path to the pyspark script for historical feature retrieval.
entity_source (EntitySource): Entity data source.
feature_tables_sources (FeatureTableSource): List of feature tables data sources.
feature_tables (List[FeatureTable]): List of feature table specification.
pyspark_script (str): Local file path to the pyspark script for historical feature
retrieval.
entity_source_conf (List[Dict]): Entity data source configuration.
feature_tables_sources_conf (Dict): List of feature tables data sources configurations.
feature_tables_conf (List[Dict]): List of feature table specification.
The order of the feature table must correspond to that of feature_tables_sources.
destination (FileDestination): Retrieval job output destination.
job_id (str):
A job id that is unique for each job submission.
destination_conf (Dict): Retrieval job output destination.
job_id (str): A job id that is unique for each job submission.
Raises:
SparkJobFailure: The spark job submission failed, encountered error
Expand Down Expand Up @@ -202,13 +184,14 @@ def spark_submit_script_path(self):
def historical_feature_retrieval(
self,
pyspark_script: str,
entity_source: EntitySource,
feature_tables_sources: List[FeatureTableSource],
feature_tables: List[FeatureTable],
destination: FileDestination,
entity_source_conf: Dict,
feature_tables_sources_conf: List[Dict],
feature_tables_conf: List[Dict],
destination_conf: Dict,
job_id: str,
**kwargs,
) -> RetrievalJob:

submission_cmd = [
self.spark_submit_script_path,
"--master",
Expand All @@ -217,17 +200,17 @@ def historical_feature_retrieval(
job_id,
pyspark_script,
"--feature-tables",
json.dumps([ft._asdict() for ft in feature_tables]),
json.dumps(feature_tables_conf),
"--feature-tables-sources",
json.dumps([fts._asdict() for fts in feature_tables_sources]),
json.dumps(feature_tables_sources_conf),
"--entity-source",
json.dumps(entity_source._asdict()),
json.dumps(entity_source_conf),
"--destination",
json.dumps(destination._asdict()),
json.dumps(destination_conf),
]

process = subprocess.Popen(submission_cmd, shell=True)
output_file = destination.path
output_file = destination_conf["path"]
return StandaloneClusterRetrievalJob(job_id, process, output_file)


Expand Down Expand Up @@ -288,13 +271,14 @@ def _stage_files(self, pyspark_script: str, job_id: str) -> str:
def historical_feature_retrieval(
self,
pyspark_script: str,
entity_source: EntitySource,
feature_tables_sources: List[FeatureTableSource],
feature_tables: List[FeatureTable],
destination: FileDestination,
entity_source_conf: Dict,
feature_tables_sources_conf: List[Dict],
feature_tables_conf: List[Dict],
destination_conf: Dict,
job_id: str,
**kwargs,
) -> RetrievalJob:

pyspark_gcs = self._stage_files(pyspark_script, job_id)
job = {
"reference": {"job_id": job_id},
Expand All @@ -303,18 +287,18 @@ def historical_feature_retrieval(
"main_python_file_uri": pyspark_gcs,
"args": [
"--feature-tables",
json.dumps([ft._asdict() for ft in feature_tables]),
json.dumps(feature_tables_conf),
"--feature-tables-sources",
json.dumps([fts._asdict() for fts in feature_tables_sources]),
json.dumps(feature_tables_sources_conf),
"--entity-source",
json.dumps(entity_source._asdict()),
json.dumps(entity_source_conf),
"--destination",
json.dumps(destination._asdict()),
json.dumps(destination_conf),
],
},
}
operation = self.job_client.submit_job_as_operation(
request={"project_id": self.project_id, "region": self.region, "job": job}
)
output_file = destination.path
output_file = destination_conf["path"]
return DataprocRetrievalJob(job_id, operation, output_file)

0 comments on commit 7c5025d

Please sign in to comment.