Skip to content

Commit

Permalink
Cleanup cli and python dependencies
Browse files Browse the repository at this point in the history
Signed-off-by: Terence <terencelimxp@gmail.com>
  • Loading branch information
terryyylim committed Oct 20, 2020
1 parent 69d3f47 commit 801a77c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 31 deletions.
108 changes: 79 additions & 29 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from feast.client import Client
from feast.config import Config
from feast.constants import CONFIG_SPARK_LAUNCHER
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
Expand Down Expand Up @@ -351,78 +352,127 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command()
@cli.group(name="jobs")
def job():
"""
Create and manage jobs
"""
pass


@job.command(name="sync-offline-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of data to be synced",
type=click.STRING,
required=True,
)
@click.option("--start-time", "-s", help="Interval start", required=True)
@click.option("--end-time", "-e", help="Interval end", required=True)
def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
"""
Sync offline store to online.
Sync offline store data to online store
"""
from datetime import datetime

client = Client()
table = client.get_feature_table(feature_table)
client.start_offline_to_online_ingestion(
table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time)
)
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

if spark_launcher == "emr":
import feast.pyspark.aws.jobs

@cli.command()
client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.sync_offline_to_online(
client, table, start_time, end_time
)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@job.command(name="start-stream-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of job to be started",
type=click.STRING,
required=True,
)
@click.option(
"--jar", "-j", help="Feature table name to ingest data into", default="",
"--jar",
"-j",
help="The file path to the uber jar for offline to online ingestion spark job",
default="",
)
def start_stream_to_online(feature_table: str, jar: str):
"""
Start stream to online sync job.
Start stream to online sync job
"""

client = Client()
table = client.get_feature_table(feature_table)
client.start_stream_to_online_ingestion(table, [jar] if jar else [])
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

if spark_launcher == "emr":
import feast.pyspark.aws.jobs

@cli.command()
client = Client()
table = client.get_feature_table(feature_table)
feast.pyspark.aws.jobs.start_stream_to_online(
client, table, [jar] if jar else []
)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@job.command(name="stop-stream-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of job to be stopped",
type=click.STRING,
required=True,
)
def stop_stream_to_online(feature_table: str):
"""
Start stream to online sync job.
Stop stream to online sync job
"""
import feast.pyspark.aws.jobs

feast.pyspark.aws.jobs.stop_stream_to_online(feature_table)
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

if spark_launcher == "emr":
import feast.pyspark.aws.jobs

@cli.command()
def list_emr_jobs():
feast.pyspark.aws.jobs.stop_stream_to_online(feature_table)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@job.command()
def list_jobs():
"""
List jobs.
List jobs
"""
from tabulate import tabulate

import feast.pyspark.aws.jobs
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

jobs = feast.pyspark.aws.jobs.list_jobs(None, None)
if spark_launcher == "emr":
import feast.pyspark.aws.jobs

print(
tabulate(jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain")
)
jobs = feast.pyspark.aws.jobs.list_jobs(None, None)
print(
tabulate(
jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain"
)
)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@cli.command()
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@
"protobuf>=3.10",
"PyYAML==5.1.*",
"fastavro>=0.22.11,<0.23",
"kafka-python==1.*",
"tabulate==0.8.*",
"toml==0.10.*",
"tqdm==4.*",
"pyarrow<0.16.0,>=0.15.1",
"numpy",
"google",
"confluent_kafka",
]

# README file from Feast repo root directory
Expand Down

0 comments on commit 801a77c

Please sign in to comment.