Skip to content

Commit

Permalink
Streaming Ingestion Job supports AVRO format as input (#1072)
Browse files Browse the repository at this point in the history
* dataformat in spark job

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* clean

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* avro format supported by streaming job

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* better check if started

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* lint

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* ci deps

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* cleam some python deps

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* use develop version

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Oct 20, 2020
1 parent 2060053 commit 69d3f47
Show file tree
Hide file tree
Showing 22 changed files with 419 additions and 71 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/master_only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,15 @@ jobs:
- uses: stCarolas/setup-maven@v3
with:
maven-version: 3.6.3
- name: build-jar
- name: Publish develop version of ingestion job
run: |
if [ ${GITHUB_REF#refs/*/} == "master" ]; then
make build-java-no-tests REVISION=develop
gsutil cp ./spark/ingestion/target/feast-ingestion-spark-develop.jar gs://${PUBLISH_BUCKET}/spark/ingestion/
fi
- name: Get version
run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF#refs/*/}
- name: Publish tagged version of ingestion job
run: |
SEMVER_REGEX='^v[0-9]+\.[0-9]+\.[0-9]+(-([0-9A-Za-z-]+(\.[0-9A-Za-z-]+)*))?$'
if echo "${RELEASE_VERSION}" | grep -P "$SEMVER_REGEX" &>/dev/null ; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static void validate(DataSource spec) {
spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
case AVRO_FORMAT:
break;
default:
throw new UnsupportedOperationException(
String.format(
Expand All @@ -68,6 +70,8 @@ public static void validate(DataSource spec) {
spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(),
"FeatureTable");
break;
case AVRO_FORMAT:
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat));
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ feast-core:

feast-jobcontroller:
# feast-jobcontroller.enabled -- Flag to install Feast Job Controller
enabled: true
enabled: false

feast-online-serving:
# feast-online-serving.enabled -- Flag to install Feast Online Serving
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class AuthProvider(Enum):
# Authentication Provider - Google OpenID/OAuth
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-0.8-SNAPSHOT.jar",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/feast-ingestion-spark-develop.jar",
CONFIG_REDIS_HOST: "localhost",
CONFIG_REDIS_PORT: "6379",
CONFIG_REDIS_SSL: "False",
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SparkJobFailure(Exception):


class SparkJobStatus(Enum):
STARTING = 0
IN_PROGRESS = 1
FAILED = 2
COMPLETED = 3
Expand Down Expand Up @@ -48,6 +49,13 @@ def get_status(self) -> SparkJobStatus:
"""
raise NotImplementedError

@abc.abstractmethod
def cancel(self):
"""
Manually terminate job
"""
raise NotImplementedError


class SparkJobParameters(abc.ABC):
@abc.abstractmethod
Expand Down
14 changes: 10 additions & 4 deletions sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class FileSource(Source):
options (Optional[Dict[str, str]]): Options to be passed to spark while reading the file source.
"""

PROTO_FORMAT_TO_SPARK = {
"ParquetFormat": "parquet",
"AvroFormat": "avro",
"CSVFormat": "csv",
}

def __init__(
self,
format: str,
Expand Down Expand Up @@ -147,7 +153,7 @@ def spark_path(self) -> str:
def _source_from_dict(dct: Dict) -> Source:
if "file" in dct.keys():
return FileSource(
dct["file"]["format"],
FileSource.PROTO_FORMAT_TO_SPARK[dct["file"]["format"]["json_class"]],
dct["file"]["path"],
dct["file"]["event_timestamp_column"],
dct["file"].get("created_timestamp_column"),
Expand Down Expand Up @@ -635,20 +641,20 @@ def retrieve_historical_features(
Example:
>>> entity_source_conf = {
"format": "csv",
"format": {"jsonClass": "ParquetFormat"},
"path": "file:///some_dir/customer_driver_pairs.csv"),
"options": {"inferSchema": "true", "header": "true"},
"field_mapping": {"id": "driver_id"}
}
>>> feature_tables_sources_conf = [
{
"format": "parquet",
"format": {"json_class": "ParquetFormat"},
"path": "gs://some_bucket/bookings.parquet"),
"field_mapping": {"id": "driver_id"}
},
{
"format": "avro",
"format": {"json_class": "AvroFormat", schema_json: "..avro schema.."},
"path": "s3://some_bucket/transactions.avro"),
}
]
Expand Down
25 changes: 15 additions & 10 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ def resolve_launcher(config: Config) -> JobLauncher:
return _launchers[config.get(CONFIG_SPARK_LAUNCHER)](config)


_SOURCES = {FileSource: "file", BigQuerySource: "bq", KafkaSource: "kafka"}


def _source_to_argument(source: DataSource):
common_properties = {
"field_mapping": dict(source.field_mapping),
Expand All @@ -97,20 +94,28 @@ def _source_to_argument(source: DataSource):
"date_partition_column": source.date_partition_column,
}

kind = _SOURCES[type(source)]
properties = {**common_properties}

if isinstance(source, FileSource):
properties["path"] = source.file_options.file_url
properties["format"] = str(source.file_options.file_format)
return {kind: properties}
properties["format"] = dict(
json_class=source.file_options.file_format.__class__.__name__
)
return {"file": properties}

if isinstance(source, BigQuerySource):
properties["table_ref"] = source.bigquery_options.table_ref
return {kind: properties}
return {"bq": properties}

if isinstance(source, KafkaSource):
properties["topic"] = source.kafka_options.topic
properties["classpath"] = source.kafka_options.class_path
properties["bootstrap_servers"] = source.kafka_options.bootstrap_servers
return {kind: properties}
properties["topic"] = source.kafka_options.topic
properties["format"] = {
**source.kafka_options.message_format.__dict__,
"json_class": source.kafka_options.message_format.__class__.__name__,
}
return {"kafka": properties}

raise NotImplementedError(f"Unsupported Datasource: {type(source)}")


Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def get_status(self) -> SparkJobStatus:
# we should never get here
raise Exception("Invalid EMR state")

def cancel(self):
raise NotImplementedError


class EmrRetrievalJob(EmrJobMixin, RetrievalJob):
"""
Expand Down
17 changes: 13 additions & 4 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def get_status(self) -> SparkJobStatus:

return SparkJobStatus.FAILED

def cancel(self):
self._operation.cancel()


class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob):
"""
Expand All @@ -71,7 +74,13 @@ def get_output_file_uri(self, timeout_sec=None):

class DataprocBatchIngestionJob(DataprocJobMixin, BatchIngestionJob):
"""
Ingestion job result for a Dataproc cluster
Batch Ingestion job result for a Dataproc cluster
"""


class DataprocStreamingIngestionJob(DataprocJobMixin, StreamIngestionJob):
"""
Streaming Ingestion job result for a Dataproc cluster
"""


Expand Down Expand Up @@ -151,14 +160,14 @@ def historical_feature_retrieval(
)

def offline_to_online_ingestion(
self, job_params: BatchIngestionJobParameters
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
return DataprocBatchIngestionJob(self.dataproc_submit(job_params))
return DataprocBatchIngestionJob(self.dataproc_submit(ingestion_job_params))

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
raise NotImplementedError
return DataprocStreamingIngestionJob(self.dataproc_submit(ingestion_job_params))

def stage_dataframe(
self, df, event_timestamp_column: str, created_timestamp_column: str,
Expand Down
99 changes: 90 additions & 9 deletions sdk/python/feast/pyspark/launchers/standalone/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
import socket
import subprocess
import uuid
from contextlib import closing

import requests
from requests.exceptions import RequestException

from feast.pyspark.abc import (
BatchIngestionJob,
Expand All @@ -16,28 +21,77 @@
)


def _find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


class StandaloneClusterJobMixin:
def __init__(self, job_id: str, process: subprocess.Popen):
def __init__(
self, job_id: str, job_name: str, process: subprocess.Popen, ui_port: int = None
):
self._job_id = job_id
self._job_name = job_name
self._process = process
self._ui_port = ui_port

def get_id(self) -> str:
return self._job_id

def check_if_started(self):
if not self._ui_port:
return True

try:
applications = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications"
).json()
except RequestException:
return False

app = next(
iter(app for app in applications if app["name"] == self._job_name), None
)
if not app:
return False

stages = requests.get(
f"http://localhost:{self._ui_port}/api/v1/applications/{app['id']}/stages"
).json()
return bool(stages)

def get_status(self) -> SparkJobStatus:
code = self._process.poll()
if code is None:
if not self.check_if_started():
return SparkJobStatus.STARTING

return SparkJobStatus.IN_PROGRESS

if code != 0:
return SparkJobStatus.FAILED

return SparkJobStatus.COMPLETED

def cancel(self):
self._process.terminate()


class StandaloneClusterBatchIngestionJob(StandaloneClusterJobMixin, BatchIngestionJob):
"""
Ingestion job result for a standalone spark cluster
Batch Ingestion job result for a standalone spark cluster
"""

pass


class StandaloneClusterStreamingIngestionJob(
StandaloneClusterJobMixin, StreamIngestionJob
):
"""
Streaming Ingestion job result for a standalone spark cluster
"""

pass
Expand All @@ -48,7 +102,13 @@ class StandaloneClusterRetrievalJob(StandaloneClusterJobMixin, RetrievalJob):
Historical feature retrieval job result for a standalone spark cluster
"""

def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str):
def __init__(
self,
job_id: str,
job_name: str,
process: subprocess.Popen,
output_file_uri: str,
):
"""
This is the returned historical feature retrieval job result for StandaloneClusterLauncher.
Expand All @@ -57,7 +117,7 @@ def __init__(self, job_id: str, process: subprocess.Popen, output_file_uri: str)
process (subprocess.Popen): Pyspark driver process, spawned by the launcher.
output_file_uri (str): Uri to the historical feature retrieval job output file.
"""
super().__init__(job_id, process)
super().__init__(job_id, job_name, process)
self._output_file_uri = output_file_uri

def get_output_file_uri(self, timeout_sec: int = None):
Expand Down Expand Up @@ -100,7 +160,9 @@ def __init__(self, master_url: str, spark_home: str = None):
def spark_submit_script_path(self):
return os.path.join(self.spark_home, "bin/spark-submit")

def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
def spark_submit(
self, job_params: SparkJobParameters, ui_port: int = None
) -> subprocess.Popen:
submission_cmd = [
self.spark_submit_script_path,
"--master",
Expand All @@ -112,6 +174,9 @@ def spark_submit(self, job_params: SparkJobParameters) -> subprocess.Popen:
if job_params.get_class_name():
submission_cmd.extend(["--class", job_params.get_class_name()])

if ui_port:
submission_cmd.extend(["--conf", f"spark.ui.port={ui_port}"])

submission_cmd.append(job_params.get_main_file_path())
submission_cmd.extend(job_params.get_arguments())

Expand All @@ -122,19 +187,35 @@ def historical_feature_retrieval(
) -> RetrievalJob:
job_id = str(uuid.uuid4())
return StandaloneClusterRetrievalJob(
job_id, self.spark_submit(job_params), job_params.get_destination_path()
job_id,
job_params.get_name(),
self.spark_submit(job_params),
job_params.get_destination_path(),
)

def offline_to_online_ingestion(
self, job_params: BatchIngestionJobParameters
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
job_id = str(uuid.uuid4())
return StandaloneClusterBatchIngestionJob(job_id, self.spark_submit(job_params))
ui_port = _find_free_port()
return StandaloneClusterBatchIngestionJob(
job_id,
ingestion_job_params.get_name(),
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
raise NotImplementedError
job_id = str(uuid.uuid4())
ui_port = _find_free_port()
return StandaloneClusterStreamingIngestionJob(
job_id,
ingestion_job_params.get_name(),
self.spark_submit(ingestion_job_params, ui_port),
ui_port,
)

def stage_dataframe(
self, df, event_timestamp_column: str, created_timestamp_column: str,
Expand Down
Loading

0 comments on commit 69d3f47

Please sign in to comment.