Skip to content

Commit

Permalink
Remove spark extra configs and hardcode spark jars/conf in stadalone …
Browse files Browse the repository at this point in the history
…mode

Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com>
  • Loading branch information
tsotnet committed Nov 3, 2020
1 parent 7e43530 commit 3335ae4
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 44 deletions.
2 changes: 0 additions & 2 deletions infra/docker/jobservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,5 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa
ENV FEAST_SPARK_LAUNCHER standalone
ENV FEAST_SPARK_STANDALONE_MASTER "local[*]"
ENV FEAST_SPARK_HOME $SPARK_HOME
ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"

CMD ["feast", "server"]
2 changes: 0 additions & 2 deletions infra/docker/jupyter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,5 @@ COPY examples .
ENV FEAST_SPARK_LAUNCHER standalone
ENV FEAST_SPARK_STANDALONE_MASTER "local[*]"
ENV FEAST_SPARK_HOME $SPARK_HOME
ENV FEAST_SPARK_EXTRA_OPTIONS "--jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar \
--conf spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"

CMD ["start-notebook.sh", "--NotebookApp.token=''"]
3 changes: 0 additions & 3 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class AuthProvider(Enum):
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location"

CONFIG_SPARK_EXTRA_OPTIONS = "spark_extra_options"

# Configuration option default values
FEAST_DEFAULT_OPTIONS = {
# Default Feast project to use
Expand Down Expand Up @@ -136,5 +134,4 @@ class AuthProvider(Enum):
CONFIG_REDIS_PORT: "6379",
CONFIG_REDIS_SSL: "False",
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet",
CONFIG_SPARK_EXTRA_OPTIONS: "",
}
24 changes: 0 additions & 24 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,6 @@ def get_arguments(self) -> List[str]:
"""
raise NotImplementedError

@abc.abstractmethod
def get_extra_options(self) -> str:
"""
Spark job dependencies (expected to resolved from maven)
Returns:
str: Spark job dependencies.
"""
raise NotImplementedError


class RetrievalJobParameters(SparkJobParameters):
def __init__(
Expand All @@ -130,7 +121,6 @@ def __init__(
feature_tables_sources: List[Dict],
entity_source: Dict,
destination: Dict,
extra_options: str = "",
):
"""
Args:
Expand Down Expand Up @@ -242,7 +232,6 @@ def __init__(
self._feature_tables_sources = feature_tables_sources
self._entity_source = entity_source
self._destination = destination
self._extra_options = extra_options

def get_name(self) -> str:
all_feature_tables_names = [ft["name"] for ft in self._feature_tables]
Expand Down Expand Up @@ -271,9 +260,6 @@ def get_arguments(self) -> List[str]:
def get_destination_path(self) -> str:
return self._destination["path"]

def get_extra_options(self) -> str:
return self._extra_options


class RetrievalJob(SparkJob):
"""
Expand Down Expand Up @@ -315,7 +301,6 @@ def __init__(
redis_host: str,
redis_port: int,
redis_ssl: bool,
extra_options: str = "",
):
self._feature_table = feature_table
self._source = source
Expand All @@ -325,7 +310,6 @@ def __init__(
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl
self._extra_options = extra_options

def get_name(self) -> str:
return (
Expand Down Expand Up @@ -364,9 +348,6 @@ def get_arguments(self) -> List[str]:
json.dumps(self._get_redis_config()),
]

def get_extra_options(self) -> str:
return self._extra_options


class StreamIngestionJobParameters(SparkJobParameters):
def __init__(
Expand All @@ -378,7 +359,6 @@ def __init__(
redis_host: str,
redis_port: int,
redis_ssl: bool,
extra_options="",
):
self._feature_table = feature_table
self._source = source
Expand All @@ -387,7 +367,6 @@ def __init__(
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl
self._extra_options = extra_options

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
Expand Down Expand Up @@ -422,9 +401,6 @@ def get_arguments(self) -> List[str]:
json.dumps(self._get_redis_config()),
]

def get_extra_options(self) -> str:
return self._extra_options


class BatchIngestionJob(SparkJob):
"""
Expand Down
4 changes: 0 additions & 4 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH,
CONFIG_SPARK_EMR_LOG_LOCATION,
CONFIG_SPARK_EMR_REGION,
CONFIG_SPARK_EXTRA_OPTIONS,
CONFIG_SPARK_HOME,
CONFIG_SPARK_INGESTION_JOB_JAR,
CONFIG_SPARK_LAUNCHER,
Expand Down Expand Up @@ -192,7 +191,6 @@ def start_historical_feature_retrieval_job(
for feature_table in feature_tables
],
destination={"format": output_format, "path": output_path},
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand Down Expand Up @@ -256,7 +254,6 @@ def start_offline_to_online_ingestion(
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand All @@ -277,7 +274,6 @@ def start_stream_to_online_ingestion(
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
extra_options=client._config.get(CONFIG_SPARK_EXTRA_OPTIONS),
)
)

Expand Down
21 changes: 13 additions & 8 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import subprocess
import uuid
from contextlib import closing
from typing import List
from typing import Dict, List

import requests
from requests.exceptions import RequestException
Expand All @@ -24,7 +24,7 @@

# In-memory cache of Spark jobs
# This is necessary since we can't query Spark jobs in local mode
JOB_CACHE = {}
JOB_CACHE: Dict[str, SparkJob] = {}


def _find_free_port():
Expand Down Expand Up @@ -204,12 +204,17 @@ def spark_submit(
"spark.sql.session.timeZone=UTC", # ignore local timezone
"--packages",
f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}",
"--jars",
"https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar,"
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar,"
"https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar",
"--conf",
"spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem",
"--conf",
"spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
]
)

if job_params.get_extra_options():
submission_cmd.extend(job_params.get_extra_options().split(" "))

submission_cmd.append(job_params.get_main_file_path())
submission_cmd.extend(job_params.get_arguments())

Expand All @@ -225,7 +230,7 @@ def historical_feature_retrieval(
self.spark_submit(job_params),
job_params.get_destination_path(),
)
JOB_CACHE[job_id] = job # type: ignore
JOB_CACHE[job_id] = job
return job

def offline_to_online_ingestion(
Expand All @@ -239,7 +244,7 @@ def offline_to_online_ingestion(
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)
JOB_CACHE[job_id] = job # type: ignore
JOB_CACHE[job_id] = job
return job

def start_stream_to_online_ingestion(
Expand All @@ -253,7 +258,7 @@ def start_stream_to_online_ingestion(
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)
JOB_CACHE[job_id] = job # type: ignore
JOB_CACHE[job_id] = job
return job

def stage_dataframe(self, df, event_timestamp_column: str):
Expand Down
7 changes: 6 additions & 1 deletion sdk/python/feast/remote_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from feast.pyspark.abc import (
BatchIngestionJob,
RetrievalJob,
SparkJob,
SparkJobFailure,
SparkJobStatus,
StreamIngestionJob,
SparkJob,
)

GrpcExtraParamProvider = Callable[[], Dict[str, Any]]
Expand Down Expand Up @@ -156,3 +156,8 @@ def get_remote_job_from_proto(
return RemoteBatchIngestionJob(service, grpc_extra_param_provider, job.id)
elif job.type == JobType.STREAM_INGESTION_JOB:
return RemoteStreamIngestionJob(service, grpc_extra_param_provider, job.id)
else:
raise ValueError(
f"Invalid Job Type {job.type}, has to be one of "
f"{(JobType.RETRIEVAL_JOB, JobType.BATCH_INGESTION_JOB, JobType.STREAM_INGESTION_JOB)}"
)

0 comments on commit 3335ae4

Please sign in to comment.