diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index cf1417a354..bd67774e83 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -23,6 +23,7 @@ from feast.client import Client from feast.config import Config +from feast.constants import CONFIG_SPARK_LAUNCHER from feast.entity import Entity from feast.feature_table import FeatureTable from feast.loaders.yaml import yaml_loader @@ -351,78 +352,127 @@ def project_list(): print(tabulate(table, headers=["NAME"], tablefmt="plain")) -@cli.command() +@cli.group(name="jobs") +def job(): + """ + Create and manage jobs + """ + pass + + +@job.command(name="sync-offline-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of data to be synced", + type=click.STRING, required=True, ) @click.option("--start-time", "-s", help="Interval start", required=True) @click.option("--end-time", "-e", help="Interval end", required=True) def sync_offline_to_online(feature_table: str, start_time: str, end_time: str): """ - Sync offline store to online. + Sync offline store data to online store """ - from datetime import datetime - client = Client() - table = client.get_feature_table(feature_table) - client.start_offline_to_online_ingestion( - table, datetime.fromisoformat(start_time), datetime.fromisoformat(end_time) - ) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() + client = Client() + table = client.get_feature_table(feature_table) + feast.pyspark.aws.jobs.sync_offline_to_online( + client, table, start_time, end_time + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command(name="start-stream-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of job to be started", + type=click.STRING, required=True, ) @click.option( - "--jar", "-j", help="Feature table name to ingest data into", default="", + "--jar", + "-j", + help="The file path to the uber jar for offline to online ingestion spark job", + default="", ) def start_stream_to_online(feature_table: str, jar: str): """ - Start stream to online sync job. + Start stream to online sync job """ - client = Client() - table = client.get_feature_table(feature_table) - client.start_stream_to_online_ingestion(table, [jar] if jar else []) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() + client = Client() + table = client.get_feature_table(feature_table) + feast.pyspark.aws.jobs.start_stream_to_online( + client, table, [jar] if jar else [] + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command(name="stop-stream-to-online") @click.option( "--feature-table", "-t", - help="Feature table name to ingest data into", + help="Feature table name of job to be stopped", + type=click.STRING, required=True, ) def stop_stream_to_online(feature_table: str): """ - Start stream to online sync job. + Stop stream to online sync job """ - import feast.pyspark.aws.jobs - feast.pyspark.aws.jobs.stop_stream_to_online(feature_table) + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs -@cli.command() -def list_emr_jobs(): + feast.pyspark.aws.jobs.stop_stream_to_online(feature_table) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) + + +@job.command() +def list_jobs(): """ - List jobs. + List jobs """ from tabulate import tabulate - import feast.pyspark.aws.jobs + spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) - jobs = feast.pyspark.aws.jobs.list_jobs(None, None) + if spark_launcher == "emr": + import feast.pyspark.aws.jobs - print( - tabulate(jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain") - ) + jobs = feast.pyspark.aws.jobs.list_jobs(None, None) + print( + tabulate( + jobs, headers=feast.pyspark.aws.jobs.JobInfo._fields, tablefmt="plain" + ) + ) + else: + raise NotImplementedError( + f"Feast currently does not provide support for the specified spark launcher: {spark_launcher}" + ) @cli.command() diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 5ad33e4eaa..19e567ecd3 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -39,14 +39,12 @@ "protobuf>=3.10", "PyYAML==5.1.*", "fastavro>=0.22.11,<0.23", - "kafka-python==1.*", "tabulate==0.8.*", "toml==0.10.*", "tqdm==4.*", "pyarrow<0.16.0,>=0.15.1", "numpy", "google", - "confluent_kafka", ] # README file from Feast repo root directory