Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance job api to return associated feature table and start time #1259

Merged
merged 8 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions protos/feast/core/JobService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,31 @@ message Job {
// Current job status
JobStatus status = 3;
// Deterministic hash of the Job
string hash = 8;
string hash = 4;
// Start time of the Job
google.protobuf.Timestamp start_time = 5;

message RetrievalJobMeta {
string output_location = 4;
string output_location = 1;
}

message OfflineToOnlineMeta {
string table_name = 1;
}

message StreamToOnlineMeta {
string table_name = 1;
}

// JobType specific metadata on the job
oneof meta {
RetrievalJobMeta retrieval = 5;
OfflineToOnlineMeta batch_ingestion = 6;
StreamToOnlineMeta stream_ingestion = 7;
RetrievalJobMeta retrieval = 6;
OfflineToOnlineMeta batch_ingestion = 7;
StreamToOnlineMeta stream_ingestion = 8;
}

// Path to Spark job logs, if available
string log_uri = 9;
}

// Ingest data from offline store into online store
Expand All @@ -107,8 +114,17 @@ message StartOfflineToOnlineIngestionJobRequest {
}

message StartOfflineToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string table_name = 3;

// Path to Spark job logs, if available
string log_uri = 4;
}

message GetHistoricalFeaturesRequest {
Expand Down Expand Up @@ -136,9 +152,18 @@ message GetHistoricalFeaturesRequest {
}

message GetHistoricalFeaturesResponse {
// Export Job with ID assigned by Feast
string id = 1;
string output_file_uri = 2;
// Export Job with ID assigned by Feast
string id = 1;

// Uri to the join result output file
string output_file_uri = 2;

// Job start time
google.protobuf.Timestamp job_start_time = 3;

// Path to Spark job logs, if available
string log_uri = 4;

}

message StartStreamToOnlineIngestionJobRequest {
Expand All @@ -148,8 +173,17 @@ message StartStreamToOnlineIngestionJobRequest {
}

message StartStreamToOnlineIngestionJobResponse {
// Job ID assigned by Feast
string id = 1;
// Job ID assigned by Feast
string id = 1;

// Job start time
google.protobuf.Timestamp job_start_time = 2;

// Feature table associated with the job
string table_name = 3;

// Path to Spark job logs, if available
string log_uri = 4;
}

message ListJobsRequest {
Expand Down
16 changes: 14 additions & 2 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,8 @@ def get_historical_features(
self._extra_grpc_params,
response.id,
output_file_uri=response.output_file_uri,
start_time=response.job_start_time.ToDatetime(),
log_uri=response.log_uri,
)
else:
return start_historical_feature_retrieval_job(
Expand Down Expand Up @@ -1174,7 +1176,12 @@ def start_offline_to_online_ingestion(
request.end_date.FromDatetime(end)
response = self._job_service.StartOfflineToOnlineIngestionJob(request)
return RemoteBatchIngestionJob(
self._job_service, self._extra_grpc_params, response.id,
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time.ToDatetime(),
response.log_uri,
)

def start_stream_to_online_ingestion(
Expand All @@ -1196,7 +1203,12 @@ def start_stream_to_online_ingestion(
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
self._job_service, self._extra_grpc_params, response.id
self._job_service,
self._extra_grpc_params,
response.id,
feature_table.name,
response.job_start_time,
response.log_uri,
)

def list_jobs(self, include_terminated: bool) -> List[SparkJob]:
Expand Down
45 changes: 40 additions & 5 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, cast

import grpc
from google.protobuf.timestamp_pb2 import Timestamp

import feast
from feast.constants import ConfigOptions as opt
Expand Down Expand Up @@ -54,6 +55,7 @@
def _job_to_proto(spark_job: SparkJob) -> JobProto:
job = JobProto()
job.id = spark_job.get_id()
job.log_uri = cast(str, spark_job.get_log_uri() or "")
status = spark_job.get_status()
if status == SparkJobStatus.COMPLETED:
job.status = JobStatus.JOB_STATUS_DONE
Expand All @@ -71,11 +73,15 @@ def _job_to_proto(spark_job: SparkJob) -> JobProto:
job.retrieval.output_location = spark_job.get_output_file_uri(block=False)
elif isinstance(spark_job, BatchIngestionJob):
job.type = JobType.BATCH_INGESTION_JOB
job.batch_ingestion.table_name = spark_job.get_feature_table()
elif isinstance(spark_job, StreamIngestionJob):
job.type = JobType.STREAM_INGESTION_JOB
job.stream_ingestion.table_name = spark_job.get_feature_table()
else:
raise ValueError(f"Invalid job type {job}")

job.start_time.FromDatetime(spark_job.get_start_time())

return job


Expand All @@ -97,7 +103,16 @@ def StartOfflineToOnlineIngestionJob(
start=request.start_date.ToDatetime(),
end=request.end_date.ToDatetime(),
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return StartOfflineToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=request.table_name,
log_uri=job.get_log_uri(),
)

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
Expand All @@ -114,8 +129,13 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):

output_file_uri = job.get_output_file_uri(block=False)

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())

return GetHistoricalFeaturesResponse(
id=job.get_id(), output_file_uri=output_file_uri
id=job.get_id(),
output_file_uri=output_file_uri,
job_start_time=job_start_timestamp,
)

def StartStreamToOnlineIngestionJob(
Expand All @@ -135,7 +155,14 @@ def StartStreamToOnlineIngestionJob(
job_hash = params.get_job_hash()
for job in list_jobs(include_terminated=True, client=self.client):
if isinstance(job, StreamIngestionJob) and job.get_hash() == job_hash:
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())
job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=job.get_feature_table(),
log_uri=job.get_log_uri(),
)
raise RuntimeError(
"Feast Job Service has control loop enabled, but couldn't find the existing stream ingestion job for the given FeatureTable"
)
Expand All @@ -147,7 +174,15 @@ def StartStreamToOnlineIngestionJob(
feature_table=feature_table,
extra_jars=[],
)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

job_start_timestamp = Timestamp()
job_start_timestamp.FromDatetime(job.get_start_time())
return StartStreamToOnlineIngestionJobResponse(
id=job.get_id(),
job_start_time=job_start_timestamp,
table_name=request.table_name,
log_uri=job.get_log_uri(),
)

def ListJobs(self, request, context):
"""List all types of jobs"""
Expand Down
36 changes: 36 additions & 0 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ def cancel(self):
"""
raise NotImplementedError

@abc.abstractmethod
def get_start_time(self) -> datetime:
"""
Get job start time.
"""

def get_log_uri(self) -> Optional[str]:
"""
Get path to Spark job log, if applicable.
"""
return None


class SparkJobParameters(abc.ABC):
@abc.abstractmethod
Expand Down Expand Up @@ -496,6 +508,18 @@ class BatchIngestionJob(SparkJob):
Container for the ingestion job result
"""

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return empty string if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.

Returns:
str: Feature table name
"""
raise NotImplementedError


class StreamIngestionJob(SparkJob):
"""
Expand All @@ -513,6 +537,18 @@ def get_hash(self) -> str:
"""
raise NotImplementedError

@abc.abstractmethod
def get_feature_table(self) -> str:
"""
Get the feature table name associated with this job. Return `None` if unable to
determine the feature table, such as when the job is created by the earlier
version of Feast.

Returns:
str: Feature table name
"""
raise NotImplementedError


class JobLauncher(abc.ABC):
"""
Expand Down
Loading