Skip to content

Commit

Permalink
CLI command to start/stop/list streaming ingestion job on emr
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Oct 13, 2020
1 parent 2cd019c commit cc5eeec
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 5 deletions.
53 changes: 53 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,5 +371,58 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
feast.pyspark.aws.jobs.sync_offline_to_online(client, table, start_time, end_time)


@cli.command()
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
required=True,
)
@click.option(
"--jar", "-j", help="Feature table name to ingest data into", default="",
)
def start_stream_to_online(feature_table: str, jar: str):
"""
Start stream to online sync job.
"""
import feast.pyspark.aws.jobs

client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.start_stream_to_online(client, table, [jar] if jar else [])


@cli.command()
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
required=True,
)
def stop_stream_to_online(feature_table: str):
"""
Start stream to online sync job.
"""
import feast.pyspark.aws.jobs

feast.pyspark.aws.jobs.stop_stream_to_online(feature_table)


@cli.command()
def list_emr_jobs():
"""
List jobs.
"""
from tabulate import tabulate

import feast.pyspark.aws.jobs

jobs = feast.pyspark.aws.jobs.list_jobs(None, None)

print(
tabulate(jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain")
)


if __name__ == "__main__":
cli()
207 changes: 202 additions & 5 deletions sdk/python/feast/pyspark/aws/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import os
import random
import string
from typing import Any, Dict, Tuple
import time
from typing import Any, Dict, List, NamedTuple, Optional, Tuple

import boto3
import botocore
Expand Down Expand Up @@ -89,6 +90,13 @@
# ssl: true

SUPPORTED_EMR_VERSION = "emr-6.0.0"
STREAM_TO_ONLINE_JOB_TYPE = "STREAM_TO_ONLINE_JOB"
OFFLINE_TO_ONLINE_JOB_TYPE = "OFFLINE_TO_ONLINE_JOB"


# EMR Step states considered "active", i.e. not terminated
ACTIVE_STEP_STATES = ["PENDING", "CANCEL_PENDING", "RUNNING"]
TERMINAL_STEP_STATES = ["COMPLETED", "CANCELLED", "FAILED", "INTERRUPTED"]


def _sanity_check_config(config, config_path: str):
Expand Down Expand Up @@ -121,7 +129,7 @@ def _get_config_path() -> str:

def _load_job_service_config(config_path: str):
with open(config_path) as f:
config = yaml.load(f)
config = yaml.safe_load(f)
_sanity_check_config(config, config_path)
return config

Expand All @@ -140,6 +148,18 @@ def _batch_source_to_json(batch_source):
}


def _stream_source_to_json(stream_source):
return {
"kafka": {
"bootstrapServers": stream_source.kafka_options.bootstrap_servers,
"mapping": dict(stream_source.field_mapping),
"topic": stream_source.kafka_options.topic,
"timestampColumn": stream_source.timestamp_column,
"classpath": stream_source.kafka_options.class_path,
}
}


def _feature_table_to_json(client: Client, feature_table):
return {
"features": [
Expand Down Expand Up @@ -215,7 +235,7 @@ def _upload_jar(jar_s3_prefix: str, local_path: str) -> str:
)


def _get_jar_s3_path(config) -> str:
def _get_ingestion_jar_s3_path(config) -> str:
"""
Extract job jar path from the configuration, upload it to S3 if necessary and return S3 path.
"""
Expand Down Expand Up @@ -245,7 +265,7 @@ def _sync_offline_to_online_step(
"Properties": [
{
"Key": "feast.step_metadata.job_type",
"Value": "OFFLINE_TO_ONLINE_JOB",
"Value": OFFLINE_TO_ONLINE_JOB_TYPE,
},
{
"Key": "feast.step_metadata.offline_to_online.table_name",
Expand All @@ -266,7 +286,7 @@ def _sync_offline_to_online_step(
"feast.ingestion.IngestionJob",
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.2",
_get_jar_s3_path(config),
_get_ingestion_jar_s3_path(config),
"--mode",
"offline",
"--feature-table",
Expand Down Expand Up @@ -317,3 +337,180 @@ def sync_offline_to_online(
config = _load_job_service_config(_get_config_path())
step = _sync_offline_to_online_step(client, config, feature_table, start_ts, end_ts)
_submit_emr_job(step, config)


def _stream_ingestion_step(
client: Client, config, feature_table, jars: List[str]
) -> Dict[str, Any]:
feature_table_json = _feature_table_to_json(client, feature_table)
source_json = _stream_source_to_json(feature_table.stream_source)

if jars:
jars_args = ["--jars", ",".join(jars)]
else:
jars_args = []

return {
"Name": "Feast Streaming Ingestion",
"HadoopJarStep": {
"Properties": [
{
"Key": "feast.step_metadata.job_type",
"Value": STREAM_TO_ONLINE_JOB_TYPE,
},
{
"Key": "feast.step_metadata.stream_to_online.table_name",
"Value": feature_table.name,
},
],
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"]
+ jars_args
+ [
"--packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.2",
_get_ingestion_jar_s3_path(config),
"--mode",
"online",
"--feature-table",
json.dumps(feature_table_json),
"--source",
json.dumps(source_json),
"--redis",
json.dumps(config["redisConfig"]),
],
"Jar": "command-runner.jar",
},
}


def start_stream_to_online(
client: Client, feature_table: FeatureTable, jars: List[str]
):
if _get_stream_to_online_job(client, feature_table):
raise Exception("Job already running")

config = _load_job_service_config(_get_config_path())
step = _stream_ingestion_step(client, config, feature_table, jars)
_submit_emr_job(step, config)


class JobInfo(NamedTuple):
job_type: str
cluster_id: str
step_id: str
table_name: str
state: str


def list_jobs(
job_type: Optional[str], table_name: Optional[str], active_only=True
) -> List[JobInfo]:
"""
List Feast EMR jobs.
Args:
job_type: optional filter by job type
table_name: optional filter by table name
active_only: filter only for "active" jobs, that is the ones that are running or pending, not terminated
Returns:
A list of jobs.
"""
config = _load_job_service_config(_get_config_path())
aws_config = config.get("aws", {})
emr = boto3.client("emr", region_name=aws_config.get("region"))
paginator = emr.get_paginator("list_clusters")
res: List[JobInfo] = []
for page in paginator.paginate(
ClusterStates=["STARTING", "BOOTSTRAPPING", "RUNNING", "WAITING", "TERMINATING"]
):
for cluster in page["Clusters"]:
cluster_id = cluster["Id"]
step_paginator = emr.get_paginator("list_steps")

list_steps_params = dict(ClusterId=cluster_id)
if active_only:
list_steps_params["StepStates"] = ACTIVE_STEP_STATES

for step_page in step_paginator.paginate(**list_steps_params):
for step in step_page["Steps"]:
props = step["Config"]["Properties"]
if "feast.step_metadata.job_type" not in props:
continue

step_table_name = props.get(
"feast.step_metadata.stream_to_online.table_name"
) or props.get("feast.step_metadata.offline_to_online.table_name")
step_job_type = props["feast.step_metadata.job_type"]

if table_name and step_table_name != table_name:
continue

if job_type and step_job_type != job_type:
continue

res.append(
JobInfo(
job_type=step_job_type,
cluster_id=cluster_id,
step_id=step["Id"],
state=step["Status"]["State"],
table_name=step_table_name,
)
)
return res


def _get_stream_to_online_job(
client: Client, feature_table: FeatureTable
) -> List[JobInfo]:
return list_jobs(
job_type=STREAM_TO_ONLINE_JOB_TYPE,
table_name=feature_table.name,
active_only=True,
)


def _wait_for_job_state(
emr_client, job: JobInfo, desired_states: List[str], timeout_seconds=90
):
"""
Wait up to timeout seconds for job to go into one of the desired states.
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
response = emr_client.describe_step(
ClusterId=job.cluster_id, StepId=job.step_id
)
state = response["Step"]["Status"]["State"]
if state in desired_states:
return
else:
time.sleep(0.5)
else:
raise TimeoutError(
f'Timeout waiting for job state to become {"|".join(desired_states)}'
)


def _cancel_job(job_type, table_name):
"""
Cancel a EMR job.
"""
jobs = list_jobs(job_type=job_type, table_name=table_name, active_only=True)
config = _load_job_service_config(_get_config_path())
aws_config = config.get("aws", {})

emr = boto3.client("emr", region_name=aws_config.get("region"))
for job in jobs:
emr.cancel_steps(ClusterId=job.cluster_id, StepIds=[job.step_id])

for job in jobs:
_wait_for_job_state(emr, job, TERMINAL_STEP_STATES)


def stop_stream_to_online(table_name: str):
"""
Stop offline-to-online ingestion job for the table.
"""
_cancel_job(STREAM_TO_ONLINE_JOB_TYPE, table_name)

0 comments on commit cc5eeec

Please sign in to comment.