diff --git a/core/src/main/java/feast/core/dao/JobRepository.java b/core/src/main/java/feast/core/dao/JobRepository.java index 98da76912e..c61f3eacc0 100644 --- a/core/src/main/java/feast/core/dao/JobRepository.java +++ b/core/src/main/java/feast/core/dao/JobRepository.java @@ -16,6 +16,7 @@ */ package feast.core.dao; +import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import java.util.Collection; @@ -29,4 +30,10 @@ public interface JobRepository extends JpaRepository { List findByStatusNotIn(Collection statuses); List findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName); + + // find jobs by feast store name + List findByStoreName(String storeName); + + // find jobs by featureset + List findByFeatureSetsIn(List featureSets); } diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 661bbe2403..42bc0ba23d 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -16,6 +16,7 @@ */ package feast.core.grpc; +import com.google.api.gax.rpc.InvalidArgumentException; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.CoreServiceGrpc.CoreServiceImplBase; import feast.core.CoreServiceProto.ApplyFeatureSetRequest; @@ -30,21 +31,29 @@ import feast.core.CoreServiceProto.GetFeatureSetResponse; import feast.core.CoreServiceProto.ListFeatureSetsRequest; import feast.core.CoreServiceProto.ListFeatureSetsResponse; +import feast.core.CoreServiceProto.ListIngestionJobsRequest; +import feast.core.CoreServiceProto.ListIngestionJobsResponse; import feast.core.CoreServiceProto.ListProjectsRequest; import feast.core.CoreServiceProto.ListProjectsResponse; import feast.core.CoreServiceProto.ListStoresRequest; import feast.core.CoreServiceProto.ListStoresResponse; +import feast.core.CoreServiceProto.RestartIngestionJobRequest; +import feast.core.CoreServiceProto.RestartIngestionJobResponse; +import feast.core.CoreServiceProto.StopIngestionJobRequest; +import feast.core.CoreServiceProto.StopIngestionJobResponse; import feast.core.CoreServiceProto.UpdateStoreRequest; import feast.core.CoreServiceProto.UpdateStoreResponse; import feast.core.exception.RetrievalException; import feast.core.grpc.interceptors.MonitoringInterceptor; import feast.core.model.Project; import feast.core.service.AccessManagementService; +import feast.core.service.JobService; import feast.core.service.SpecService; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.NoSuchElementException; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.lognet.springboot.grpc.GRpcService; @@ -57,11 +66,16 @@ public class CoreServiceImpl extends CoreServiceImplBase { private SpecService specService; private AccessManagementService accessManagementService; + private JobService jobService; @Autowired - public CoreServiceImpl(SpecService specService, AccessManagementService accessManagementService) { + public CoreServiceImpl( + SpecService specService, + AccessManagementService accessManagementService, + JobService jobService) { this.specService = specService; this.accessManagementService = accessManagementService; + this.jobService = jobService; } @Override @@ -192,4 +206,69 @@ public void listProjects( Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); } } + + @Override + public void listIngestionJobs( + ListIngestionJobsRequest request, + StreamObserver responseObserver) { + try { + ListIngestionJobsResponse response = this.jobService.listJobs(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (InvalidArgumentException e) { + log.error("Recieved an invalid request on calling listIngestionJobs method:", e); + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException()); + } catch (Exception e) { + log.error("Unexpected exception on calling listIngestionJobs method:", e); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } + } + + @Override + public void restartIngestionJob( + RestartIngestionJobRequest request, + StreamObserver responseObserver) { + try { + RestartIngestionJobResponse response = this.jobService.restartJob(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (NoSuchElementException e) { + log.error( + "Attempted to restart an nonexistent job on calling restartIngestionJob method:", e); + responseObserver.onError( + Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException()); + } catch (UnsupportedOperationException e) { + log.error("Recieved an unsupported request on calling restartIngestionJob method:", e); + responseObserver.onError( + Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException()); + } catch (Exception e) { + log.error("Unexpected exception on calling restartIngestionJob method:", e); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } + } + + @Override + public void stopIngestionJob( + StopIngestionJobRequest request, StreamObserver responseObserver) { + try { + StopIngestionJobResponse response = this.jobService.stopJob(request); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (NoSuchElementException e) { + log.error("Attempted to stop an nonexistent job on calling stopIngestionJob method:", e); + responseObserver.onError( + Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException()); + } catch (UnsupportedOperationException e) { + log.error("Recieved an unsupported request on calling stopIngestionJob method:", e); + responseObserver.onError( + Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException()); + } catch (Exception e) { + log.error("Unexpected exception on calling stopIngestionJob method:", e); + responseObserver.onError( + Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException()); + } + } } diff --git a/core/src/main/java/feast/core/job/JobManager.java b/core/src/main/java/feast/core/job/JobManager.java index 99880cdb76..eda211b757 100644 --- a/core/src/main/java/feast/core/job/JobManager.java +++ b/core/src/main/java/feast/core/job/JobManager.java @@ -51,6 +51,16 @@ public interface JobManager { */ void abortJob(String extId); + /** + * Restart an job. If job is an terminated state, will simply start the job. Might cause data to + * be lost during when restarting running jobs in some implementations. Refer to on docs the + * specific implementation. + * + * @param job job to restart + * @return the restarted job + */ + Job restartJob(Job job); + /** * Get status of a job given runner-specific job ID. * diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index f4df3d352a..c2313d75ec 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -152,6 +152,26 @@ public void abortJob(String dataflowJobId) { } } + /** + * Restart a restart dataflow job. Dataflow should ensure continuity between during the restart, + * so no data should be lost during the restart operation. + * + * @param job job to restart + * @return the restarted job + */ + @Override + public Job restartJob(Job job) { + JobStatus status = job.getStatus(); + if (JobStatus.getTerminalState().contains(status)) { + // job yet not running: just start job + return this.startJob(job); + } else { + // job is running - updating the job without changing the job has + // the effect of restarting the job + return this.updateJob(job); + } + } + /** * Get status of a dataflow job with given id and try to map it into Feast's JobStatus. * diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 08aeed1cc3..9b3a8473e4 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -157,6 +157,26 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept return ImportJob.runPipeline(pipelineOptions); } + /** + * Restart a direct runner job. Note that some data will be temporarily lost during when + * restarting running direct runner jobs. See {#link {@link #updateJob(Job)} for more info. + * + * @param job job to restart + * @return the restarted job + */ + @Override + public Job restartJob(Job job) { + JobStatus status = job.getStatus(); + if (JobStatus.getTerminalState().contains(status)) { + // job yet not running: just start job + return this.startJob(job); + } else { + // job is running - updating the job without changing the job has + // the effect of restarting the job. + return this.updateJob(job); + } + } + /** * Gets the state of the direct runner job. Direct runner jobs only have 2 states: RUNNING and * ABORTED. diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 377f5f7095..738a16db2d 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -16,8 +16,24 @@ */ package feast.core.model; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto; +import feast.core.IngestionJobProto; +import java.util.ArrayList; import java.util.List; -import javax.persistence.*; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.JoinColumn; +import javax.persistence.JoinTable; +import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; +import javax.persistence.OneToMany; +import javax.persistence.Table; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -102,4 +118,31 @@ public void updateMetrics(List newMetrics) { public String getSinkName() { return store.getName(); } + + /** + * Convert a job model to ingestion job proto + * + * @return Ingestion Job proto derieved from the given job + */ + public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferException { + + // convert featuresets of job to protos + List featureSetProtos = new ArrayList<>(); + for (FeatureSet featureSet : this.getFeatureSets()) { + featureSetProtos.add(featureSet.toProto()); + } + + // build ingestion job proto with job data + IngestionJobProto.IngestionJob ingestJob = + IngestionJobProto.IngestionJob.newBuilder() + .setId(this.getId()) + .setExternalId(this.getExtId()) + .setStatus(this.getStatus().toProto()) + .addAllFeatureSets(featureSetProtos) + .setSource(this.getSource().toProto()) + .setStore(this.getStore().toProto()) + .build(); + + return ingestJob; + } } diff --git a/core/src/main/java/feast/core/model/JobStatus.java b/core/src/main/java/feast/core/model/JobStatus.java index 123b57a21b..86aa512933 100644 --- a/core/src/main/java/feast/core/model/JobStatus.java +++ b/core/src/main/java/feast/core/model/JobStatus.java @@ -16,9 +16,11 @@ */ package feast.core.model; +import feast.core.IngestionJobProto.IngestionJobStatus; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Map; public enum JobStatus { /** Job status is not known. */ @@ -64,4 +66,39 @@ public enum JobStatus { public static Collection getTerminalState() { return TERMINAL_STATE; } + + private static final Collection TRANSITIONAL_STATES = + Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING)); + + /** + * Get Transitional Job Status states. Transitionals states are assigned to jobs that + * transitioning to a more stable state (ie SUSPENDED, ABORTED etc.) + * + * @return Collection of transitional Job Status states. + */ + public static final Collection getTransitionalStates() { + return TRANSITIONAL_STATES; + } + + private static final Map INGESTION_JOB_STATUS_MAP = + Map.of( + JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN, + JobStatus.PENDING, IngestionJobStatus.PENDING, + JobStatus.RUNNING, IngestionJobStatus.RUNNING, + JobStatus.COMPLETED, IngestionJobStatus.COMPLETED, + JobStatus.ABORTING, IngestionJobStatus.ABORTING, + JobStatus.ABORTED, IngestionJobStatus.ABORTED, + JobStatus.ERROR, IngestionJobStatus.ERROR, + JobStatus.SUSPENDING, IngestionJobStatus.SUSPENDING, + JobStatus.SUSPENDED, IngestionJobStatus.SUSPENDED); + + /** + * Convert a Job Status to Ingestion Job Status proto + * + * @return IngestionJobStatus proto derieved from this job status + */ + public IngestionJobStatus toProto() { + // maps job models job status to ingestion job status + return INGESTION_JOB_STATUS_MAP.get(this); + } } diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java new file mode 100644 index 0000000000..bf74b90e80 --- /dev/null +++ b/core/src/main/java/feast/core/service/JobService.java @@ -0,0 +1,284 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.CoreServiceProto.ListFeatureSetsRequest; +import feast.core.CoreServiceProto.ListFeatureSetsResponse; +import feast.core.CoreServiceProto.ListIngestionJobsRequest; +import feast.core.CoreServiceProto.ListIngestionJobsResponse; +import feast.core.CoreServiceProto.RestartIngestionJobRequest; +import feast.core.CoreServiceProto.RestartIngestionJobResponse; +import feast.core.CoreServiceProto.StopIngestionJobRequest; +import feast.core.CoreServiceProto.StopIngestionJobResponse; +import feast.core.FeatureSetReferenceProto.FeatureSetReference; +import feast.core.IngestionJobProto; +import feast.core.dao.JobRepository; +import feast.core.job.JobManager; +import feast.core.log.Action; +import feast.core.log.AuditLogger; +import feast.core.log.Resource; +import feast.core.model.FeatureSet; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** Defines a Job Managemenent Service that allows users to manage feast ingestion jobs. */ +@Slf4j +@Service +public class JobService { + private JobRepository jobRepository; + private SpecService specService; + private Map jobManagers; + + @Autowired + public JobService( + JobRepository jobRepository, SpecService specService, List jobManagerList) { + this.jobRepository = jobRepository; + this.specService = specService; + + this.jobManagers = new HashMap<>(); + for (JobManager manager : jobManagerList) { + this.jobManagers.put(manager.getRunnerType().name(), manager); + } + } + + /* Job Service API */ + /** + * List Ingestion Jobs in feast matching the given request. See CoreService protobuf documentation + * for more detailed documentation. + * + * @param request list ingestion jobs request specifying which jobs to include + * @throws IllegalArgumentException when given filter in a unsupported configuration + * @throws InvalidProtocolBufferException on error when constructing response protobuf + * @return list ingestion jobs response + */ + @Transactional(readOnly = true) + public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) + throws InvalidProtocolBufferException { + Set matchingJobIds = new HashSet<>(); + + // check that filter specified and not empty + if (request.hasFilter() + && !(request.getFilter().getId() == "" + && request.getFilter().getStoreName() == "" + && request.getFilter().hasFeatureSetReference() == false)) { + // filter jobs based on request filter + ListIngestionJobsRequest.Filter filter = request.getFilter(); + + // for proto3, default value for missing values: + // - numeric values (ie int) is zero + // - strings is empty string + if (filter.getId() != "") { + // get by id: no more filters required: found job + Optional job = this.jobRepository.findById(filter.getId()); + if (job.isPresent()) { + matchingJobIds.add(filter.getId()); + } + } else { + // multiple filters can apply together in an 'and' operation + if (filter.getStoreName() != "") { + // find jobs by name + List jobs = this.jobRepository.findByStoreName(filter.getStoreName()); + Set jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet()); + matchingJobIds = this.mergeResults(matchingJobIds, jobIds); + } + if (filter.hasFeatureSetReference()) { + // find a matching featuresets for reference + FeatureSetReference fsReference = filter.getFeatureSetReference(); + ListFeatureSetsResponse response = + this.specService.listFeatureSets(this.toListFeatureSetFilter(fsReference)); + List featureSets = + response.getFeatureSetsList().stream() + .map(FeatureSet::fromProto) + .collect(Collectors.toList()); + + // find jobs for the matching featuresets + Collection matchingJobs = this.jobRepository.findByFeatureSetsIn(featureSets); + List jobIds = matchingJobs.stream().map(Job::getId).collect(Collectors.toList()); + matchingJobIds = this.mergeResults(matchingJobIds, jobIds); + } + } + } else { + // no or empty filter: match all jobs + matchingJobIds = + this.jobRepository.findAll().stream().map(Job::getId).collect(Collectors.toSet()); + } + + // convert matching job models to ingestion job protos + List ingestJobs = new ArrayList<>(); + for (String jobId : matchingJobIds) { + Job job = this.jobRepository.findById(jobId).get(); + ingestJobs.add(job.toProto()); + } + + // pack jobs into response + return ListIngestionJobsResponse.newBuilder().addAllJobs(ingestJobs).build(); + } + + /** + * Restart (Aborts) the ingestion job matching the given restart request. See CoreService protobuf + * documentation for more detailed documentation. + * + * @param request restart ingestion job request specifying which job to stop + * @throws NoSuchElementException when restart job request requests to restart a nonexistent job. + * @throws UnsupportedOperationException when job to be restarted is in an unsupported status + * @throws InvalidProtocolBufferException on error when constructing response protobuf + */ + @Transactional + public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request) + throws InvalidProtocolBufferException { + // check job exists + Optional getJob = this.jobRepository.findById(request.getId()); + if (getJob.isEmpty()) { + throw new NoSuchElementException( + "Attempted to stop nonexistent job with id: " + getJob.get().getId()); + } + + // check job status is valid for restarting + Job job = getJob.get(); + JobStatus status = job.getStatus(); + if (JobStatus.getTransitionalStates().contains(status) + || JobStatus.getTerminalState().contains(status) + || status.equals(JobStatus.UNKNOWN)) { + throw new UnsupportedOperationException( + "Restarting a job with a transitional, terminal or unknown status is unsupported"); + } + + // restart job with job manager + JobManager jobManager = this.jobManagers.get(job.getRunner()); + job = jobManager.restartJob(job); + log.info( + String.format( + "Restarted job (id: %s, extId: %s runner: %s)", + job.getId(), job.getExtId(), job.getRunner())); + // sync job status & update job model in job repository + job = this.syncJobStatus(jobManager, job); + this.jobRepository.saveAndFlush(job); + + return RestartIngestionJobResponse.newBuilder().build(); + } + + /** + * Stops (Aborts) the ingestion job matching the given stop request. See CoreService protobuf + * documentation for more detailed documentation. + * + * @param request stop ingestion job request specifying which job to stop + * @throws NoSuchElementException when stop job request requests to stop a nonexistent job. + * @throws UnsupportedOperationException when job to be stopped is in an unsupported status + * @throws InvalidProtocolBufferException on error when constructing response protobuf + */ + @Transactional + public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) + throws InvalidProtocolBufferException { + // check job exists + Optional getJob = this.jobRepository.findById(request.getId()); + if (getJob.isEmpty()) { + throw new NoSuchElementException( + "Attempted to stop nonexistent job with id: " + getJob.get().getId()); + } + + // check job status is valid for stopping + Job job = getJob.get(); + JobStatus status = job.getStatus(); + if (JobStatus.getTerminalState().contains(status)) { + // do nothing - job is already stopped + return StopIngestionJobResponse.newBuilder().build(); + } else if (JobStatus.getTransitionalStates().contains(status) + || status.equals(JobStatus.UNKNOWN)) { + throw new UnsupportedOperationException( + "Stopping a job with a transitional or unknown status is unsupported"); + } + + // stop job with job manager + JobManager jobManager = this.jobManagers.get(job.getRunner()); + jobManager.abortJob(job.getExtId()); + log.info( + String.format( + "Restarted job (id: %s, extId: %s runner: %s)", + job.getId(), job.getExtId(), job.getRunner())); + + // sync job status & update job model in job repository + job = this.syncJobStatus(jobManager, job); + this.jobRepository.saveAndFlush(job); + + return StopIngestionJobResponse.newBuilder().build(); + } + + /* Private Utility Methods */ + private Set mergeResults(Set results, Collection newResults) { + if (results.size() <= 0) { + // no existing results: copy over new results + results.addAll(newResults); + } else { + // and operation: keep results that exist in both existing and new results + results.retainAll(newResults); + } + return results; + } + + // converts feature set reference to a list feature set filter + private ListFeatureSetsRequest.Filter toListFeatureSetFilter(FeatureSetReference fsReference) { + // match featuresets using contents of featureset reference + String fsName = fsReference.getName(); + String fsProject = fsReference.getProject(); + Integer fsVersion = fsReference.getVersion(); + + // construct list featureset request filter using feature set reference + // for proto3, default value for missing values: + // - numeric values (ie int) is zero + // - strings is empty string + ListFeatureSetsRequest.Filter filter = + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName((fsName != "") ? fsName : "*") + .setProject((fsProject != "") ? fsProject : "*") + .setFeatureSetVersion((fsVersion != 0) ? fsVersion.toString() : "*") + .build(); + + return filter; + } + + // sync job status using job manager + private Job syncJobStatus(JobManager jobManager, Job job) { + JobStatus newStatus = jobManager.getJobStatus(job); + // log job status transition + if (newStatus != job.getStatus()) { + AuditLogger.log( + Resource.JOB, + job.getId(), + Action.STATUS_CHANGE, + "Job status transition: changed from %s to %s", + job.getStatus(), + newStatus); + job.setStatus(newStatus); + } + return job; + } +} diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java new file mode 100644 index 0000000000..c0e90ca43f --- /dev/null +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -0,0 +1,444 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.CoreServiceProto.ListFeatureSetsRequest; +import feast.core.CoreServiceProto.ListFeatureSetsResponse; +import feast.core.CoreServiceProto.ListIngestionJobsRequest; +import feast.core.CoreServiceProto.ListIngestionJobsResponse; +import feast.core.CoreServiceProto.RestartIngestionJobRequest; +import feast.core.CoreServiceProto.RestartIngestionJobResponse; +import feast.core.CoreServiceProto.StopIngestionJobRequest; +import feast.core.CoreServiceProto.StopIngestionJobResponse; +import feast.core.FeatureSetProto.FeatureSetStatus; +import feast.core.FeatureSetReferenceProto.FeatureSetReference; +import feast.core.IngestionJobProto.IngestionJob; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.SourceType; +import feast.core.StoreProto.Store.RedisConfig; +import feast.core.StoreProto.Store.StoreType; +import feast.core.dao.JobRepository; +import feast.core.job.JobManager; +import feast.core.job.Runner; +import feast.core.model.FeatureSet; +import feast.core.model.Field; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import feast.core.model.Store; +import feast.types.ValueProto.ValueType.Enum; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +public class JobServiceTest { + // mocks + @Mock private JobRepository jobRepository; + @Mock private JobManager jobManager; + @Mock private SpecService specService; + // fake models + private Source dataSource; + private Store dataStore; + private FeatureSet featureSet; + private List fsReferences; + private List listFilters; + private Job job; + private IngestionJob ingestionJob; + // test target + public JobService jobService; + + /* unit test setup */ + @Before + public void setup() { + initMocks(this); + + // create mock objects for testing + // fake data source + this.dataSource = + new Source( + SourceType.KAFKA, + KafkaSourceConfig.newBuilder() + .setBootstrapServers("kafka:9092") + .setTopic("my-topic") + .build(), + true); + // fake data store + this.dataStore = + new Store( + "feast-redis", + StoreType.REDIS.toString(), + RedisConfig.newBuilder().setPort(6379).build().toByteArray(), + "*:*:*"); + + // fake featureset & job + this.featureSet = this.newDummyFeatureSet("food", 2, "hunger"); + this.job = this.newDummyJob("kafka-to-redis", "job-1111", JobStatus.PENDING); + try { + this.ingestionJob = this.job.toProto(); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + + this.fsReferences = this.newDummyFeatureSetReferences(); + this.listFilters = this.newDummyListRequestFilters(); + + // setup mock objects + this.setupSpecService(); + this.setupJobRepository(); + this.setupJobManager(); + + // create test target + this.jobService = + new JobService(this.jobRepository, this.specService, Arrays.asList(this.jobManager)); + } + + // setup fake spec service + public void setupSpecService() { + try { + ListFeatureSetsResponse response = + ListFeatureSetsResponse.newBuilder().addFeatureSets(this.featureSet.toProto()).build(); + + when(this.specService.listFeatureSets(this.listFilters.get(0))).thenReturn(response); + + when(this.specService.listFeatureSets(this.listFilters.get(1))).thenReturn(response); + + when(this.specService.listFeatureSets(this.listFilters.get(2))).thenReturn(response); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + fail("Unexpected exception"); + } + } + + // setup fake job repository + public void setupJobRepository() { + when(this.jobRepository.findById(this.job.getId())).thenReturn(Optional.of(this.job)); + when(this.jobRepository.findByStoreName(this.dataStore.getName())) + .thenReturn(Arrays.asList(this.job)); + when(this.jobRepository.findByFeatureSetsIn(Arrays.asList(this.featureSet))) + .thenReturn(Arrays.asList(this.job)); + when(this.jobRepository.findAll()).thenReturn(Arrays.asList(this.job)); + } + + // TODO: setup fake job manager + public void setupJobManager() { + when(this.jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); + when(this.jobManager.restartJob(this.job)) + .thenReturn(this.newDummyJob(this.job.getId(), this.job.getExtId(), JobStatus.PENDING)); + } + + // dummy model constructorss + private FeatureSet newDummyFeatureSet(String name, int version, String project) { + Field feature = new Field(name + "_feature", Enum.INT64); + Field entity = new Field(name + "_entity", Enum.STRING); + + FeatureSet fs = + new FeatureSet( + name, + project, + version, + 100L, + Arrays.asList(entity), + Arrays.asList(feature), + this.dataSource, + FeatureSetStatus.STATUS_READY); + fs.setCreated(Date.from(Instant.ofEpochSecond(10L))); + return fs; + } + + private Job newDummyJob(String id, String extId, JobStatus status) { + return new Job( + id, + extId, + Runner.DATAFLOW.name(), + this.dataSource, + this.dataStore, + Arrays.asList(this.featureSet), + status); + } + + private List newDummyFeatureSetReferences() { + return Arrays.asList( + // all provided: name, version and project + FeatureSetReference.newBuilder() + .setVersion(this.featureSet.getVersion()) + .setName(this.featureSet.getName()) + .setProject(this.featureSet.getProject().toString()) + .build(), + + // name and project + FeatureSetReference.newBuilder() + .setName(this.featureSet.getName()) + .setProject(this.featureSet.getProject().toString()) + .build(), + + // name and version + FeatureSetReference.newBuilder() + .setName(this.featureSet.getName()) + .setVersion(this.featureSet.getVersion()) + .build()); + } + + private List newDummyListRequestFilters() { + return Arrays.asList( + // all provided: name, version and project + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(this.featureSet.getName()) + .setProject(this.featureSet.getProject().toString()) + .setFeatureSetVersion(String.valueOf(this.featureSet.getVersion())) + .build(), + + // name and project + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(this.featureSet.getName()) + .setProject(this.featureSet.getProject().toString()) + .setFeatureSetVersion("*") + .build(), + + // name and project + ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(this.featureSet.getName()) + .setProject("*") + .setFeatureSetVersion(String.valueOf(this.featureSet.getVersion())) + .build()); + } + + /* unit tests */ + private ListIngestionJobsResponse tryListJobs(ListIngestionJobsRequest request) { + ListIngestionJobsResponse response = null; + try { + response = this.jobService.listJobs(request); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + fail("Caught Unexpected exception"); + } + + return response; + } + + // list jobs + @Test + public void testListJobsById() { + ListIngestionJobsRequest.Filter filter = + ListIngestionJobsRequest.Filter.newBuilder().setId(this.job.getId()).build(); + ListIngestionJobsRequest request = + ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + + // list with no filter + request = ListIngestionJobsRequest.newBuilder().build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + + // list with empty filter + filter = ListIngestionJobsRequest.Filter.newBuilder().build(); + request = ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + } + + @Test + public void testListJobsByStoreName() { + ListIngestionJobsRequest.Filter filter = + ListIngestionJobsRequest.Filter.newBuilder().setStoreName(this.dataStore.getName()).build(); + ListIngestionJobsRequest request = + ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + } + + @Test + public void testListIngestionJobByFeatureSetReference() { + // list job by feature set reference: name and version and project + ListIngestionJobsRequest.Filter filter = + ListIngestionJobsRequest.Filter.newBuilder() + .setFeatureSetReference(this.fsReferences.get(0)) + .setId(this.job.getId()) + .build(); + ListIngestionJobsRequest request = + ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + + // list job by feature set reference: name and version + filter = + ListIngestionJobsRequest.Filter.newBuilder() + .setFeatureSetReference(this.fsReferences.get(1)) + .setId(this.job.getId()) + .build(); + request = ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + + // list job by feature set reference: name and project + filter = + ListIngestionJobsRequest.Filter.newBuilder() + .setFeatureSetReference(this.fsReferences.get(2)) + .setId(this.job.getId()) + .build(); + request = ListIngestionJobsRequest.newBuilder().setFilter(filter).build(); + assertThat(this.tryListJobs(request).getJobs(0), equalTo(this.ingestionJob)); + } + + // stop jobs + private StopIngestionJobResponse tryStopJob( + StopIngestionJobRequest request, boolean expectError) { + StopIngestionJobResponse response = null; + try { + response = this.jobService.stopJob(request); + // expected exception, but none was thrown + if (expectError) { + fail("Expected exception, but none was thrown"); + } + } catch (Exception e) { + if (expectError != true) { + // unexpected exception + e.printStackTrace(); + fail("Caught Unexpected exception trying to restart job"); + } + } + + return response; + } + + @Test + public void testStopJobForId() { + JobStatus prevStatus = this.job.getStatus(); + this.job.setStatus(JobStatus.RUNNING); + + StopIngestionJobRequest request = + StopIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryStopJob(request, false); + verify(this.jobManager).abortJob(this.job.getExtId()); + + // TODO: check that for job status change in featureset source + + this.job.setStatus(prevStatus); + } + + @Test + public void testStopAlreadyStop() { + // check that stop jobs does not trying to stop jobs that are not already stopped + List doNothingStatuses = new ArrayList<>(); + doNothingStatuses.addAll(JobStatus.getTerminalState()); + + JobStatus prevStatus = this.job.getStatus(); + for (JobStatus status : doNothingStatuses) { + this.job.setStatus(status); + + StopIngestionJobRequest request = + StopIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryStopJob(request, false); + + verify(this.jobManager, never()).abortJob(this.job.getExtId()); + } + + this.job.setStatus(prevStatus); + } + + @Test + public void testStopUnsupportedError() { + // check for UnsupportedOperationException when trying to stop jobs are + // in an in unknown or in a transitional state + JobStatus prevStatus = this.job.getStatus(); + List unsupportedStatuses = new ArrayList<>(); + unsupportedStatuses.addAll(JobStatus.getTransitionalStates()); + unsupportedStatuses.add(JobStatus.UNKNOWN); + + for (JobStatus status : unsupportedStatuses) { + this.job.setStatus(status); + + StopIngestionJobRequest request = + StopIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryStopJob(request, true); + } + + this.job.setStatus(prevStatus); + } + + // restart jobs + private RestartIngestionJobResponse tryRestartJob( + RestartIngestionJobRequest request, boolean expectError) { + RestartIngestionJobResponse response = null; + try { + response = this.jobService.restartJob(request); + // expected exception, but none was thrown + if (expectError) { + fail("Expected exception, but none was thrown"); + } + } catch (Exception e) { + if (expectError != true) { + // unexpected exception + e.printStackTrace(); + fail("Caught Unexpected exception trying to stop job"); + } + } + + return response; + } + + @Test + public void testRestartJobForId() { + JobStatus prevStatus = this.job.getStatus(); + + // restart running job + this.job.setStatus(JobStatus.RUNNING); + RestartIngestionJobRequest request = + RestartIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryRestartJob(request, false); + + // restart terminated job + this.job.setStatus(JobStatus.SUSPENDED); + request = RestartIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryRestartJob(request, false); + + verify(this.jobManager, times(2)).restartJob(this.job); + verify(this.jobRepository, times(2)).saveAndFlush(this.job); + + this.job.setStatus(prevStatus); + } + + @Test + public void testRestartUnsupportedError() { + // check for UnsupportedOperationException when trying to restart jobs are + // in an in unknown or in a transitional state + JobStatus prevStatus = this.job.getStatus(); + List unsupportedStatuses = new ArrayList<>(); + unsupportedStatuses.addAll(JobStatus.getTransitionalStates()); + unsupportedStatuses.add(JobStatus.UNKNOWN); + + for (JobStatus status : unsupportedStatuses) { + this.job.setStatus(status); + + RestartIngestionJobRequest request = + RestartIngestionJobRequest.newBuilder().setId(this.job.getId()).build(); + this.tryRestartJob(request, true); + } + + this.job.setStatus(prevStatus); + } +} diff --git a/protos/feast/core/CoreService.proto b/protos/feast/core/CoreService.proto index 35b96e1789..b7760d0b9a 100644 --- a/protos/feast/core/CoreService.proto +++ b/protos/feast/core/CoreService.proto @@ -24,6 +24,8 @@ option java_package = "feast.core"; import "feast/core/FeatureSet.proto"; import "feast/core/Store.proto"; +import "feast/core/FeatureSetReference.proto"; +import "feast/core/IngestionJob.proto"; service CoreService { // Retrieve version information about this Feast deployment @@ -73,6 +75,24 @@ service CoreService { // Lists all projects active projects. rpc ListProjects (ListProjectsRequest) returns (ListProjectsResponse); + + // List Ingestion Jobs given an optional filter. + // Returns allow ingestions matching the given request filter. + // Returns all ingestion jobs if no filter is provided. + // Returns an empty list if no ingestion jobs match the filter. + rpc ListIngestionJobs(ListIngestionJobsRequest) returns (ListIngestionJobsResponse); + + // Restart an Ingestion Job. Restarts the ingestion job with the given job id. + // NOTE: Data might be lost during the restart for some job runners. + // Does not support stopping a job in a transitional (ie pending, suspending, aborting), + // terminal state (ie suspended or aborted) or unknown status + rpc RestartIngestionJob(RestartIngestionJobRequest) returns (RestartIngestionJobResponse); + + // Stop an Ingestion Job. Stop (Aborts) the ingestion job with the given job id. + // Does nothing if the target job if already in a terminal state (ie suspended or aborted). + // Does not support stopping a job in a transitional (ie pending, suspending, aborting) or unknown status + rpc StopIngestionJob(StopIngestionJobRequest) returns (StopIngestionJobResponse); + } // Request for a single feature set @@ -215,4 +235,42 @@ message ListProjectsRequest { message ListProjectsResponse { // List of project names (archived projects are filtered out) repeated string projects = 1; -} \ No newline at end of file +} + +// Request for listing ingestion jobs +message ListIngestionJobsRequest { + Filter filter = 1; + + message Filter { + // Filter by Job ID assigned by Feast + string id = 1; + // Filter by ingestion job target feature set. + FeatureSetReference feature_set_reference = 2; + // Filter by Name of store + string store_name = 3; + } +} + +// Response from listing ingestion jobs +message ListIngestionJobsResponse { + repeated IngestionJob jobs = 1; +} + +// Request to restart ingestion job +message RestartIngestionJobRequest { + // Job ID assigned by Feast + string id = 1; +} + +// Response from restartingan injestion job +message RestartIngestionJobResponse {} + + +// Request to stop ingestion job +message StopIngestionJobRequest { + // Job ID assigned by Feast + string id = 1; +} + +// Request from stopping an ingestion job +message StopIngestionJobResponse {} diff --git a/protos/feast/core/FeatureSetReference.proto b/protos/feast/core/FeatureSetReference.proto new file mode 100644 index 0000000000..2501ec0931 --- /dev/null +++ b/protos/feast/core/FeatureSetReference.proto @@ -0,0 +1,33 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package feast.core; + +option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "FeatureSetReferenceProto"; +option java_package = "feast.core"; + +// Defines a composite key that refers to a unique FeatureSet +message FeatureSetReference { + // Name of the project + string project = 1; + // Name of the FeatureSet + string name = 2; + // Version no. of the FeatureSet + int32 version = 3; +} diff --git a/protos/feast/core/IngestionJob.proto b/protos/feast/core/IngestionJob.proto new file mode 100644 index 0000000000..68af28c076 --- /dev/null +++ b/protos/feast/core/IngestionJob.proto @@ -0,0 +1,65 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +package feast.core; + +option go_package = "github.com/gojek/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "IngestionJobProto"; +option java_package = "feast.core"; + +import "feast/core/FeatureSet.proto"; +import "feast/core/Store.proto"; +import "feast/core/Source.proto"; + +// Represents Feast Injestion Job +message IngestionJob { + // Job ID assigned by Feast + string id = 1; + // External job ID specific to the runner. + // For DirectRunner jobs, this is identical to id. For DataflowRunner jobs, this refers to the Dataflow job ID. + string external_id = 2; + IngestionJobStatus status = 3; + // List of feature sets whose features are populated by this job. + repeated feast.core.FeatureSet feature_sets = 4; + // Source this job is reading from. + feast.core.Source source = 5; + // Store this job is writing to. + feast.core.Store store = 6; +} + +// Status of a Feast Ingestion Job +enum IngestionJobStatus { + // Job status is not known. + UNKNOWN = 0; + // Import job is submitted to runner and currently pending for executing + PENDING = 1; + // Import job is currently running in the runner + RUNNING = 2; + // Runner's reported the import job has completed (applicable to batch job) + COMPLETED = 3; + // When user sent abort command, but it's still running + ABORTING = 4; + // User initiated abort job + ABORTED = 5; + // Runner's reported that the import job failed to run or there is a failure during job + ERROR = 6; + // job has been suspended and waiting for cleanup + SUSPENDING = 7; + // job has been suspended + SUSPENDED = 8; +} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index dc4784b302..cd1146b481 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -22,8 +22,9 @@ from feast.client import Client from feast.config import Config -from feast.feature_set import FeatureSet +from feast.feature_set import FeatureSet, FeatureSetRef from feast.loaders.yaml import yaml_loader +from feast.core.IngestionJob_pb2 import IngestionJobStatus _logger = logging.getLogger(__name__) @@ -128,11 +129,11 @@ def feature_set_list(): table = [] for fs in feast_client.list_feature_sets(): - table.append([fs.name, fs.version]) + table.append([fs.name, fs.version, repr(fs)]) from tabulate import tabulate - print(tabulate(table, headers=["NAME", "VERSION"], tablefmt="plain")) + print(tabulate(table, headers=["NAME", "VERSION", "REFERENCE"], tablefmt="plain")) @feature_set.command("apply") @@ -214,6 +215,116 @@ def project_list(): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.group(name="ingest-jobs") +def ingest_job(): + """ + Manage ingestion jobs + """ + pass + + +@ingest_job.command("list") +@click.option("--job-id", "-i", help="Show only ingestion jobs with the given job id") +@click.option( + "--feature-set-ref", + "-f", + help="Show only ingestion job targeting the feature set with the given reference", +) +@click.option( + "--store-name", + "-s", + help="List only ingestion job that ingest into feast store with given name", +) +# TODO: types +def ingest_job_list(job_id, feature_set_ref, store_name): + """ + List ingestion jobs + """ + # parse feature set reference + if feature_set_ref is not None: + feature_set_ref = FeatureSetRef.from_str(feature_set_ref) + + # pull & render ingestion jobs as a table + feast_client = Client() + table = [] + for ingest_job in feast_client.list_ingest_jobs( + job_id=job_id, feature_set_ref=feature_set_ref, store_name=store_name + ): + table.append([ingest_job.id, IngestionJobStatus.Name(ingest_job.status)]) + + from tabulate import tabulate + + print(tabulate(table, headers=["ID", "STATUS"], tablefmt="plain")) + + +@ingest_job.command("describe") +@click.argument("job_id") +def ingest_job_describe(job_id: str): + """ + Describe the ingestion job with the given id. + """ + # find ingestion job for id + feast_client = Client() + jobs = feast_client.list_ingest_jobs(job_id=job_id) + if len(jobs) < 1: + print(f"Ingestion Job with id {job_id} could not be found") + sys.exit(1) + job = jobs[0] + + # pretty render ingestion job as yaml + print( + yaml.dump(yaml.safe_load(str(job)), default_flow_style=False, sort_keys=False) + ) + + +@ingest_job.command("stop") +@click.option( + "--wait", "-w", is_flag=True, help="Wait for the ingestion job to fully stop." +) +@click.option( + "--timeout", + "-t", + default=600, + help="Timeout in seconds to wait for the job to stop.", +) +@click.argument("job_id") +def ingest_job_stop(wait: bool, timeout: int, job_id: str): + """ + Stop ingestion job for id. + """ + # find ingestion job for id + feast_client = Client() + jobs = feast_client.list_ingest_jobs(job_id=job_id) + if len(jobs) < 1: + print(f"Ingestion Job with id {job_id} could not be found") + sys.exit(1) + job = jobs[0] + + feast_client.stop_ingest_job(job) + + # wait for ingestion job to stop + if wait: + job.wait(IngestionJobStatus.ABORTED, timeout=timeout) + + +@ingest_job.command("restart") +@click.argument("job_id") +def ingest_job_restart(job_id: str): + """ + Restart job for id. + Waits for the job to fully restart. + """ + # find ingestion job for id + feast_client = Client() + jobs = feast_client.list_ingest_jobs(job_id=job_id) + if len(jobs) < 1: + print(f"Ingestion Job with id {job_id} could not be found") + sys.exit(1) + job = jobs[0] + + feast_client.restart_ingest_job(job) + + @cli.command() @click.option( "--name", "-n", help="Feature set name to ingest data into", required=True diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 2a0b636b37..f5aed118cf 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -50,11 +50,14 @@ ListFeatureSetsResponse, ListProjectsRequest, ListProjectsResponse, + ListIngestionJobsRequest, + RestartIngestionJobRequest, + StopIngestionJobRequest, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub from feast.core.FeatureSet_pb2 import FeatureSetStatus -from feast.feature_set import Entity, FeatureSet -from feast.job import Job +from feast.feature_set import Entity, FeatureSet, FeatureSetRef +from feast.job import RetrievalJob, IngestJob from feast.loaders.abstract_producer import get_producer from feast.loaders.file import export_source_to_staging_location from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT, get_feature_row_chunks @@ -416,7 +419,7 @@ def list_feature_sets( Args: project: Filter feature sets based on project name name: Filter feature sets based on feature set name - version: Filter feature sets based on version number + version: Filter feature sets based on version numbf, Returns: List of feature sets @@ -507,7 +510,7 @@ def get_batch_features( feature_refs: List[str], entity_rows: Union[pd.DataFrame, str], default_project: str = None, - ) -> Job: + ) -> RetrievalJob: """ Retrieves historical features from a Feast Serving deployment. @@ -525,8 +528,8 @@ def get_batch_features( default_project: Default project where feature values will be found. Returns: - feast.job.Job: - Returns a job object that can be used to monitor retrieval + feast.job.RetrievalJob: + Returns a retrival job object that can be used to monitor retrieval progress asynchronously, and can be used to materialize the results. @@ -606,7 +609,7 @@ def get_batch_features( # Retrieve Feast Job object to manage life cycle of retrieval response = self._serving_service_stub.GetBatchFeatures(request) - return Job(response.job, self._serving_service_stub) + return RetrievalJob(response.job, self._serving_service_stub) def get_online_features( self, @@ -648,6 +651,72 @@ def get_online_features( ) ) + def list_ingest_jobs( + self, + job_id: str = None, + feature_set_ref: FeatureSetRef = None, + store_name: str = None, + ): + """ + List the ingestion jobs currently registered in Feast, with optional filters. + Provides detailed metadata about each ingestion job. + + Args: + job_id: Select specific ingestion job with the given job_id + feature_set_ref: Filter ingestion jobs by target feature set (via reference) + store_name: Filter ingestion jobs by target feast store's name + + Returns: + List of IngestJobs matching the given filters + """ + self._connect_core() + # construct list request + feature_set_ref = None + list_filter = ListIngestionJobsRequest.Filter( + id=job_id, feature_set_reference=feature_set_ref, store_name=store_name, + ) + request = ListIngestionJobsRequest(filter=list_filter) + # make list request & unpack response + response = self._core_service_stub.ListIngestionJobs(request) + ingest_jobs = [ + IngestJob(proto, self._core_service_stub) for proto in response.jobs + ] + return ingest_jobs + + def restart_ingest_job(self, job: IngestJob): + """ + Restart ingestion job currently registered in Feast. + NOTE: Data might be lost during the restart for some job runners. + Does not support stopping a job in a transitional (ie pending, suspending, aborting), + terminal state (ie suspended or aborted) or unknown status + + Args: + job: IngestJob to restart + """ + self._connect_core() + request = RestartIngestionJobRequest(id=job.id) + try: + self._core_service_stub.RestartIngestionJob(request) + except grpc.RpcError as e: + raise grpc.RpcError(e.details()) + + def stop_ingest_job(self, job: IngestJob): + """ + Stop ingestion job currently resgistered in Feast + Does nothing if the target job if already in a terminal state (ie suspended or aborted). + Does not support stopping a job in a transitional (ie pending, suspending, aborting) + or in a unknown status + + Args: + job: IngestJob to restart + """ + self._connect_core() + request = StopIngestionJobRequest(id=job.id) + try: + self._core_service_stub.StopIngestionJob(request) + except grpc.RpcError as e: + raise grpc.RpcError(e.details()) + def ingest( self, feature_set: Union[str, FeatureSet], diff --git a/sdk/python/feast/feature_set.py b/sdk/python/feast/feature_set.py index 4ebfecf167..c4cedaf6b2 100644 --- a/sdk/python/feast/feature_set.py +++ b/sdk/python/feast/feature_set.py @@ -27,6 +27,9 @@ from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto from feast.core.FeatureSet_pb2 import FeatureSetMeta as FeatureSetMetaProto from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto +from feast.core.FeatureSetReference_pb2 import ( + FeatureSetReference as FeatureSetReferenceProto, +) from feast.entity import Entity from feast.feature import Feature, Field from feast.loaders import yaml as feast_yaml @@ -88,14 +91,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __repr__(self): - ref = "" - if self.project: - ref += self.project + "/" - if self.name: - ref += self.name - if self.version: - ref += ":" + str(self.version).strip() - return ref + return FeatureSetRef.from_feature_set(self).__repr__() @property def fields(self) -> Dict[str, Field]: @@ -761,6 +757,104 @@ def to_proto(self) -> FeatureSetProto: return FeatureSetProto(spec=spec, meta=meta) +class FeatureSetRef: + """ + Represents a reference to a featureset + """ + + def __init__(self, project: str = None, name: str = None, version: int = None): + self.proto = FeatureSetReferenceProto( + project=project, name=name, version=version + ) + + @property + def project(self) -> str: + """ + Get the project of feature set referenced by this reference + """ + return self.proto.project + + @property + def name(self) -> str: + """ + Get the name of feature set referenced by this reference + """ + return self.proto.name + + @property + def version(self) -> int: + """ + Get the version of feature set referenced by this reference + """ + return self.proto.version + + @classmethod + def from_feature_set(cls, feature_set: FeatureSet): + """ + Construct a feature set reference that refers to the given feature set. + + Args: + feature_set: Feature set to create reference from. + + Returns: + FeatureSetRef that refers to the given feature set + """ + return cls(feature_set.project, feature_set.name, feature_set.version) + + @classmethod + def from_str(cls, ref_str: str): + """ + Parse a feature reference from string representation. + (as defined by __repr__()) + + Args: + ref_str: string representation of the reference. + + Returns: + FeatureSetRef constructed from the string + """ + if "/" in ref_str: + project, ref_str = ref_str.split("/") + if ":" in ref_str: + ref_str, version_str = ref_str.split(":") + name = ref_str + + return cls(project, name, int(version_str)) + + def to_proto(self, arg1) -> FeatureSetReferenceProto: + """ + Convert and return this feature set reference to protobuf. + + Returns: + Protobuf version of this feature set reference. + """ + return self.proto + + def __str__(self): + # human readable string of the reference + return f"FeatureSetRef<{self.__repr__()}>" + + def __repr__(self): + # return string representation of the reference + # [project/]name[:version] + ref_str = "" + if self.proto.project: + ref_str += self.proto.project + "/" + if self.proto.name: + ref_str += self.proto.name + if self.proto.version: + ref_str += ":" + str(self.proto.version).strip() + return ref_str + + def __eq__(self, other): + # compare with other feature set + return hash(self) == hash(other) + + def __hash__(self): + # hash this reference + return hash(repr(self)) + + def _infer_pd_column_type(column, series, rows_to_sample): dtype = None sample_count = 0 diff --git a/sdk/python/feast/job.py b/sdk/python/feast/job.py index ab65da7445..3576bc1b38 100644 --- a/sdk/python/feast/job.py +++ b/sdk/python/feast/job.py @@ -2,11 +2,15 @@ import time from datetime import datetime, timedelta from urllib.parse import urlparse +from typing import List import fastavro import pandas as pd from google.cloud import storage +from google.protobuf.json_format import MessageToJson +from feast.feature_set import FeatureSet +from feast.source import Source from feast.serving.ServingService_pb2 import ( DATA_FORMAT_AVRO, JOB_STATUS_DONE, @@ -14,8 +18,13 @@ ) from feast.serving.ServingService_pb2 import Job as JobProto from feast.serving.ServingService_pb2_grpc import ServingServiceStub +from feast.core.Store_pb2 import Store +from feast.core.IngestionJob_pb2 import IngestionJob as IngestJobProto +from feast.core.IngestionJob_pb2 import IngestionJobStatus +from feast.core.CoreService_pb2_grpc import CoreServiceStub +from feast.core.CoreService_pb2 import ListIngestionJobsRequest -# Maximum no of seconds to wait until the jobs status is DONE in Feast +# Maximum no of seconds to wait until the retrieval jobs status is DONE in Feast # Currently set to the maximum query execution time limit in BigQuery DEFAULT_TIMEOUT_SEC: int = 21600 @@ -23,7 +32,7 @@ MAX_WAIT_INTERVAL_SEC: int = 60 -class Job: +class RetrievalJob: """ A class representing a job for feature retrieval in Feast. """ @@ -33,7 +42,6 @@ def __init__(self, job_proto: JobProto, serving_stub: ServingServiceStub): Args: job_proto: Job proto object (wrapped by this job object) serving_stub: Stub for Feast serving service - storage_client: Google Cloud Storage client """ self.job_proto = job_proto self.serving_stub = serving_stub @@ -187,3 +195,107 @@ def to_chunked_dataframe( def __iter__(self): return iter(self.result()) + + +class IngestJob: + """ + Defines a job for feature ingestion in feast. + """ + + def __init__(self, job_proto: IngestJobProto, core_stub: CoreServiceStub): + """ + Construct a native ingest job from its protobuf version. + + Args: + job_proto: Job proto object to construct from. + core_stub: stub for Feast CoreService + """ + self.proto = job_proto + self.core_svc = core_stub + + def reload(self): + """ + Update this IngestJob with the latest info from Feast + """ + # pull latest proto from feast core + response = self.core_svc.ListIngestionJobs( + ListIngestionJobsRequest(filter=ListIngestionJobsRequest.Filter(id=self.id)) + ) + self.proto = response.jobs[0] + + @property + def id(self) -> str: + """ + Getter for IngestJob's job id. + """ + return self.proto.id + + @property + def external_id(self) -> str: + """ + Getter for IngestJob's external job id. + """ + self.reload() + return self.proto.external_id + + @property + def status(self) -> IngestionJobStatus: + """ + Getter for IngestJob's status + """ + self.reload() + return self.proto.status + + @property + def feature_sets(self) -> List[FeatureSet]: + """ + Getter for the IngestJob's feature sets + """ + # convert featureset protos to native objects + return [FeatureSet.from_proto(fs) for fs in self.proto.feature_sets] + + @property + def source(self) -> Source: + """ + Getter for the IngestJob's data source. + """ + return Source.from_proto(self.proto.source) + + @property + def store(self) -> Store: + """ + Getter for the IngestJob's target feast store. + """ + return self.proto.store + + def wait(self, status: IngestionJobStatus, timeout_secs: float = 300): + """ + Wait for this IngestJob to transtion to the given status. + Raises TimeoutError if the wait operation times out. + + Args: + status: The IngestionJobStatus to wait for. + timeout_secs: Maximum seconds to wait before timing out. + """ + # poll & wait for job status to transition + wait_begin = time.time() + wait_secs = 2 + elapsed_secs = 0 + while self.status != status and elapsed_secs <= timeout_secs: + time.sleep(wait_secs) + # back off wait duration exponentially, capped at MAX_WAIT_INTERVAL_SEC + wait_secs = min(wait_secs * 2, MAX_WAIT_INTERVAL_SEC) + elapsed_secs = time.time() - wait_begin + + # raise error if timeout + if elapsed_secs > timeout_secs: + raise TimeoutError("Wait for IngestJob's status to transition timed out") + + def __str__(self): + # render the contents of ingest job as human readable string + self.reload() + return str(MessageToJson(self.proto)) + + def __repr__(self): + # render the ingest job as human readable string + return f"IngestJob<{self.id}>" diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 3c1e8bef0f..f7f5676ced 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -29,6 +29,12 @@ from feast.core.CoreService_pb2 import ( GetFeastCoreVersionResponse, GetFeatureSetResponse, + ListIngestionJobsResponse, +) +from feast.core.Store_pb2 import Store +from feast.core.IngestionJob_pb2 import ( + IngestionJob as IngestJobProto, + IngestionJobStatus, ) from feast.core.FeatureSet_pb2 import EntitySpec as EntitySpecProto from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto @@ -38,7 +44,8 @@ from feast.core.FeatureSet_pb2 import FeatureSpec as FeatureSpecProto from feast.core.Source_pb2 import KafkaSourceConfig, Source, SourceType from feast.entity import Entity -from feast.feature_set import Feature, FeatureSet +from feast.feature_set import Feature, FeatureSet, FeatureSetRef +from feast.job import IngestJob from feast.serving.ServingService_pb2 import ( GetFeastServingInfoResponse, GetOnlineFeaturesRequest, @@ -295,7 +302,109 @@ def test_get_feature_set(self, mocked_client, mocker): and len(feature_set.entities) == 1 ) - # @pytest.mark.parametrize( + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) + def test_list_ingest_jobs(self, mocked_client, mocker): + mocker.patch.object( + mocked_client, + "_core_service_stub", + return_value=Core.CoreServiceStub(grpc.insecure_channel("")), + ) + + feature_set_proto = FeatureSetProto( + spec=FeatureSetSpecProto( + project="test", name="driver", max_age=Duration(seconds=3600), + ) + ) + + mocker.patch.object( + mocked_client._core_service_stub, + "ListIngestionJobs", + return_value=ListIngestionJobsResponse( + jobs=[ + IngestJobProto( + id="kafka-to-redis", + external_id="job-2222", + status=IngestionJobStatus.RUNNING, + feature_sets=[feature_set_proto], + source=Source( + type=SourceType.KAFKA, + kafka_source_config=KafkaSourceConfig( + bootstrap_servers="localhost:9092", topic="topic" + ), + ), + store=Store(name="redis"), + ) + ] + ), + ) + + # list ingestion jobs by target feature set reference + ingest_jobs = mocked_client.list_ingest_jobs( + feature_set_ref=FeatureSetRef.from_feature_set( + FeatureSet.from_proto(feature_set_proto) + ) + ) + assert len(ingest_jobs) >= 1 + + ingest_job = ingest_jobs[0] + assert ( + ingest_job.status == IngestionJobStatus.RUNNING + and ingest_job.id == "kafka-to-redis" + and ingest_job.external_id == "job-2222" + and ingest_job.feature_sets[0].name == "driver" + and ingest_job.source.source_type == "Kafka" + ) + + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) + def test_restart_ingest_job(self, mocked_client, mocker): + mocker.patch.object( + mocked_client, + "_core_service_stub", + return_value=Core.CoreServiceStub(grpc.insecure_channel("")), + ) + + ingest_job = IngestJob( + job_proto=IngestJobProto( + id="kafka-to-redis", + external_id="job#2222", + status=IngestionJobStatus.ERROR, + ), + core_stub=mocked_client._core_service_stub, + ) + + mocked_client.restart_ingest_job(ingest_job) + assert mocked_client._core_service_stub.RestartIngestionJob.called + + @pytest.mark.parametrize( + "mocked_client", + [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], + ) + def test_stop_ingest_job(self, mocked_client, mocker): + mocker.patch.object( + mocked_client, + "_core_service_stub", + return_value=Core.CoreServiceStub(grpc.insecure_channel("")), + ) + + ingest_job = IngestJob( + job_proto=IngestJobProto( + id="kafka-to-redis", + external_id="job#2222", + status=IngestionJobStatus.RUNNING, + ), + core_stub=mocked_client._core_service_stub, + ) + + mocked_client.stop_ingest_job(ingest_job) + assert mocked_client._core_service_stub.StopIngestionJob.called + + # @pytest.mark.parametrize # "mocked_client", # [pytest.lazy_fixture("mock_client"), pytest.lazy_fixture("secure_mock_client")], # ) diff --git a/sdk/python/tests/test_feature_set.py b/sdk/python/tests/test_feature_set.py index 2c539ebe0a..bd31d712bb 100644 --- a/sdk/python/tests/test_feature_set.py +++ b/sdk/python/tests/test_feature_set.py @@ -23,7 +23,7 @@ import feast.core.CoreService_pb2_grpc as Core from feast.client import Client from feast.entity import Entity -from feast.feature_set import Feature, FeatureSet +from feast.feature_set import Feature, FeatureSet, FeatureSetRef from feast.value_type import ValueType from feast_core_server import CoreServicer @@ -167,3 +167,20 @@ def test_add_features_from_df_success( ) assert len(my_feature_set.features) == feature_count assert len(my_feature_set.entities) == entity_count + + +class TestFeatureSetRef: + def test_from_feature_set(self): + feature_set = FeatureSet("test", "test") + feature_set.version = 2 + ref = FeatureSetRef.from_feature_set(feature_set) + + assert ref.name == "test" + assert ref.project == "test" + assert ref.version == 2 + + def test_str_ref(self): + original_ref = FeatureSetRef(project="test", name="test", version=2) + ref_str = repr(original_ref) + parsed_ref = FeatureSetRef.from_str(ref_str) + assert original_ref == parsed_ref diff --git a/tests/e2e/basic-ingest-redis-serving.py b/tests/e2e/basic-ingest-redis-serving.py index 1aeccfa5a3..8e40794344 100644 --- a/tests/e2e/basic-ingest-redis-serving.py +++ b/tests/e2e/basic-ingest-redis-serving.py @@ -7,9 +7,10 @@ GetOnlineFeaturesRequest, GetOnlineFeaturesResponse, ) +from feast.core.IngestionJob_pb2 import IngestionJobStatus from feast.types.Value_pb2 import Value as Value from feast.client import Client -from feast.feature_set import FeatureSet +from feast.feature_set import FeatureSet, FeatureSetRef from feast.type_map import ValueType from google.protobuf.duration_pb2 import Duration from datetime import datetime @@ -108,7 +109,6 @@ def test_basic_ingest_success(client, basic_dataframe): client.ingest(cust_trans_fs, basic_dataframe) time.sleep(5) - @pytest.mark.timeout(45) @pytest.mark.run(order=12) def test_basic_retrieve_online_success(client, basic_dataframe): @@ -152,6 +152,28 @@ def test_basic_retrieve_online_success(client, basic_dataframe): ): break +@pytest.mark.timeout(300) +@pytest.mark.run(order=19) +def test_basic_ingest_jobs(client, basic_dataframe): + # list ingestion jobs given featureset + cust_trans_fs = client.get_feature_set(name="customer_transactions") + ingest_jobs = client.list_ingest_jobs( + feature_set_ref=FeatureSetRef.from_feature_set(cust_trans_fs)) + # filter ingestion jobs to only those that are running + ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + assert len(ingest_jobs) >= 1 + + for ingest_job in ingest_jobs: + # restart ingestion ingest_job + client.restart_ingest_job(ingest_job) + ingest_job.wait(IngestionJobStatus.RUNNING) + assert ingest_job.status == IngestionJobStatus.RUNNING + + # stop ingestion ingest_job + client.stop_ingest_job(ingest_job) + ingest_job.wait(IngestionJobStatus.ABORTED) + assert ingest_job.status == IngestionJobStatus.ABORTED + @pytest.fixture(scope='module') def all_types_dataframe(): @@ -311,6 +333,27 @@ def test_all_types_retrieve_online_success(client, all_types_dataframe): ): break +@pytest.mark.timeout(300) +@pytest.mark.run(order=29) +def test_all_types_ingest_jobs(client, all_types_dataframe): + # list ingestion jobs given featureset + all_types_fs = client.get_feature_set(name="all_types") + ingest_jobs = client.list_ingest_jobs( + feature_set_ref=FeatureSetRef.from_feature_set(all_types_fs)) + # filter ingestion jobs to only those that are running + ingest_jobs = [job for job in ingest_jobs if job.status == IngestionJobStatus.RUNNING] + assert len(ingest_jobs) >= 1 + + for ingest_job in ingest_jobs: + # restart ingestion ingest_job + client.restart_ingest_job(ingest_job) + ingest_job.wait(IngestionJobStatus.RUNNING) + assert ingest_job.status == IngestionJobStatus.RUNNING + + # stop ingestion ingest_job + client.stop_ingest_job(ingest_job) + ingest_job.wait(IngestionJobStatus.ABORTED) + assert ingest_job.status == IngestionJobStatus.ABORTED @pytest.fixture(scope='module') def large_volume_dataframe(): @@ -466,7 +509,6 @@ def all_types_parquet_file(): df.to_parquet(file_path, allow_truncated_timestamps=True) return file_path - @pytest.mark.timeout(300) @pytest.mark.run(order=40) def test_all_types_parquet_register_feature_set_success(client):