From 3335ae424701ecf8e9778a371d594e7508c6789e Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Tue, 3 Nov 2020 14:43:59 +0400 Subject: [PATCH] Remove spark extra configs and hardcode spark jars/conf in stadalone mode Signed-off-by: Tsotne Tabidze --- infra/docker/jobservice/Dockerfile | 2 -- infra/docker/jupyter/Dockerfile | 2 -- sdk/python/feast/constants.py | 3 --- sdk/python/feast/pyspark/abc.py | 24 ------------------- sdk/python/feast/pyspark/launcher.py | 4 ---- .../pyspark/launchers/standalone/local.py | 21 +++++++++------- sdk/python/feast/remote_job.py | 7 +++++- 7 files changed, 19 insertions(+), 44 deletions(-) diff --git a/infra/docker/jobservice/Dockerfile b/infra/docker/jobservice/Dockerfile index 4eb000441b..885b6c52e0 100644 --- a/infra/docker/jobservice/Dockerfile +++ b/infra/docker/jobservice/Dockerfile @@ -30,7 +30,5 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa ENV FEAST_SPARK_LAUNCHER standalone ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" ENV FEAST_SPARK_HOME $SPARK_HOME -ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ ---conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" CMD ["feast", "server"] diff --git a/infra/docker/jupyter/Dockerfile b/infra/docker/jupyter/Dockerfile index 6396784a38..69aa3622ca 100644 --- a/infra/docker/jupyter/Dockerfile +++ b/infra/docker/jupyter/Dockerfile @@ -29,7 +29,5 @@ COPY examples . ENV FEAST_SPARK_LAUNCHER standalone ENV FEAST_SPARK_STANDALONE_MASTER "local[*]" ENV FEAST_SPARK_HOME $SPARK_HOME -ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \ ---conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" CMD ["start-notebook.sh", "--NotebookApp.token=''"] \ No newline at end of file diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 9588032099..ddb1cea7de 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -93,8 +93,6 @@ class AuthProvider(Enum): CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path" CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location" -CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options" - # Configuration option default values FEAST_DEFAULT_OPTIONS = { # Default Feast project to use @@ -136,5 +134,4 @@ class AuthProvider(Enum): CONFIG_REDIS_PORT: "6379", CONFIG_REDIS_SSL: "False", CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet", - CONFIG_SPARK_EXTRA_OPTIONS: "", } diff --git a/sdk/python/feast/pyspark/abc.py b/sdk/python/feast/pyspark/abc.py index d350d764c9..127112a06f 100644 --- a/sdk/python/feast/pyspark/abc.py +++ b/sdk/python/feast/pyspark/abc.py @@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]: """ raise NotImplementedError - @abc.abstractmethod - def get_extra_options(self) -> str: - """ - Spark job dependencies (expected to resolved from maven) - Returns: - str: Spark job dependencies. - """ - raise NotImplementedError - class RetrievalJobParameters(SparkJobParameters): def __init__( @@ -130,7 +121,6 @@ def __init__( feature_tables_sources: List[Dict], entity_source: Dict, destination: Dict, - extra_options: str = "", ): """ Args: @@ -242,7 +232,6 @@ def __init__( self._feature_tables_sources = feature_tables_sources self._entity_source = entity_source self._destination = destination - self._extra_options = extra_options def get_name(self) -> str: all_feature_tables_names = [ft["name"] for ft in self._feature_tables] @@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]: def get_destination_path(self) -> str: return self._destination["path"] - def get_extra_options(self) -> str: - return self._extra_options - class RetrievalJob(SparkJob): """ @@ -315,7 +301,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options: str = "", ): self._feature_table = feature_table self._source = source @@ -325,7 +310,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return ( @@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class StreamIngestionJobParameters(SparkJobParameters): def __init__( @@ -378,7 +359,6 @@ def __init__( redis_host: str, redis_port: int, redis_ssl: bool, - extra_options="", ): self._feature_table = feature_table self._source = source @@ -387,7 +367,6 @@ def __init__( self._redis_host = redis_host self._redis_port = redis_port self._redis_ssl = redis_ssl - self._extra_options = extra_options def get_name(self) -> str: return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}" @@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]: json.dumps(self._get_redis_config()), ] - def get_extra_options(self) -> str: - return self._extra_options - class BatchIngestionJob(SparkJob): """ diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 8260383fbc..08b5ca5b21 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -16,7 +16,6 @@ CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH, CONFIG_SPARK_EMR_LOG_LOCATION, CONFIG_SPARK_EMR_REGION, - CONFIG_SPARK_EXTRA_OPTIONS, CONFIG_SPARK_HOME, CONFIG_SPARK_INGESTION_JOB_JAR, CONFIG_SPARK_LAUNCHER, @@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job( for feature_table in feature_tables ], destination={"format": output_format, "path": output_path}, - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -256,7 +254,6 @@ def start_offline_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) @@ -277,7 +274,6 @@ def start_stream_to_online_ingestion( redis_host=client._config.get(CONFIG_REDIS_HOST), redis_port=client._config.getint(CONFIG_REDIS_PORT), redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS), ) ) diff --git a/sdk/python/feast/pyspark/launchers/standalone/local.py b/sdk/python/feast/pyspark/launchers/standalone/local.py index 995b702073..821a962ed3 100644 --- a/sdk/python/feast/pyspark/launchers/standalone/local.py +++ b/sdk/python/feast/pyspark/launchers/standalone/local.py @@ -3,7 +3,7 @@ import subprocess import uuid from contextlib import closing -from typing import List +from typing import Dict, List import requests from requests.exceptions import RequestException @@ -24,7 +24,7 @@ # In-memory cache of Spark jobs # This is necessary since we can't query Spark jobs in local mode -JOB_CACHE = {} +JOB_CACHE: Dict[str, SparkJob] = {} def _find_free_port(): @@ -204,12 +204,17 @@ def spark_submit( "spark.sql.session.timeZone=UTC", # ignore local timezone "--packages", f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}", + "--jars", + "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar," + "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar," + "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar", + "--conf", + "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem", + "--conf", + "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", ] ) - if job_params.get_extra_options(): - submission_cmd.extend(job_params.get_extra_options().split(" ")) - submission_cmd.append(job_params.get_main_file_path()) submission_cmd.extend(job_params.get_arguments()) @@ -225,7 +230,7 @@ def historical_feature_retrieval( self.spark_submit(job_params), job_params.get_destination_path(), ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def offline_to_online_ingestion( @@ -239,7 +244,7 @@ def offline_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def start_stream_to_online_ingestion( @@ -253,7 +258,7 @@ def start_stream_to_online_ingestion( self.spark_submit(ingestion_job_params, ui_port), ui_port, ) - JOB_CACHE[job_id] = job # type: ignore + JOB_CACHE[job_id] = job return job def stage_dataframe(self, df, event_timestamp_column: str): diff --git a/sdk/python/feast/remote_job.py b/sdk/python/feast/remote_job.py index 029b86e55f..0a766cf796 100644 --- a/sdk/python/feast/remote_job.py +++ b/sdk/python/feast/remote_job.py @@ -8,10 +8,10 @@ from feast.pyspark.abc import ( BatchIngestionJob, RetrievalJob, + SparkJob, SparkJobFailure, SparkJobStatus, StreamIngestionJob, - SparkJob, ) GrpcExtraParamProvider = Callable[[], Dict[str, Any]] @@ -156,3 +156,8 @@ def get_remote_job_from_proto( return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id) elif job.type == JobType.STREAM_INGESTION_JOB: return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id) + else: + raise ValueError( + f"Invalid Job Type {job.type}, has to be one of " + f"{(JobType.RETRIEVAL_JOB, JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB)}" + )