Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup CLI and Python dependencies #1062

Merged
merged 2 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from feast.client import Client
from feast.config import Config
from feast.constants import CONFIG_SPARK_LAUNCHER
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
Expand Down Expand Up @@ -351,18 +352,27 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command()
@cli.group(name="jobs")
def job():
"""
Create and manage jobs
"""
pass


@job.command(name="start-offline-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of data to be synced",
type=click.STRING,
required=True,
)
@click.option("--start-time", "-s", help="Interval start", required=True)
@click.option("--end-time", "-e", help="Interval end", required=True)
def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
"""
Sync offline store to online.
Sync offline store data to online store
"""
from datetime import datetime

Expand All @@ -373,56 +383,77 @@ def sync_offline_to_online(feature_table: str, start_time: str, end_time: str):
)


@cli.command()
@job.command(name="start-stream-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of job to be started",
type=click.STRING,
required=True,
)
@click.option(
"--jar", "-j", help="Feature table name to ingest data into", default="",
"--jar",
"-j",
help="The file path to the uber jar for offline to online ingestion spark job",
default="",
)
def start_stream_to_online(feature_table: str, jar: str):
"""
Start stream to online sync job.
Start stream to online sync job
"""

client = Client()
table = client.get_feature_table(feature_table)
client.start_stream_to_online_ingestion(table, [jar] if jar else [])


@cli.command()
@job.command(name="stop-stream-to-online")
@click.option(
"--feature-table",
"-t",
help="Feature table name to ingest data into",
help="Feature table name of job to be stopped",
type=click.STRING,
required=True,
)
def stop_stream_to_online(feature_table: str):
"""
Start stream to online sync job.
Stop stream to online sync job
"""
import feast.pyspark.aws.jobs

feast.pyspark.aws.jobs.stop_stream_to_online(feature_table)
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

if spark_launcher == "emr":
import feast.pyspark.aws.jobs

@cli.command()
def list_emr_jobs():
feast.pyspark.aws.jobs.stop_stream_to_online(feature_table)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@job.command()
def list_jobs():
"""
List jobs.
List jobs
"""
from tabulate import tabulate

import feast.pyspark.aws.jobs
spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER)

jobs = feast.pyspark.aws.jobs.list_jobs(None, None)
if spark_launcher == "emr":
import feast.pyspark.aws.jobs

print(
tabulate(jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain")
)
jobs = feast.pyspark.aws.jobs.list_jobs(None, None)
print(
tabulate(
jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain"
)
)
else:
raise NotImplementedError(
f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}"
)


@cli.command()
Expand Down
2 changes: 0 additions & 2 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@
"protobuf>=3.10",
"PyYAML==5.1.*",
"fastavro>=0.22.11,<0.23",
"kafka-python==1.*",
"tabulate==0.8.*",
"toml==0.10.*",
"tqdm==4.*",
"pyarrow<0.16.0,>=0.15.1",
"numpy",
"google",
"confluent_kafka",
]

# README file from Feast repo root directory
Expand Down