diff --git a/core/src/main/java/feast/core/dao/JobRepository.java b/core/src/main/java/feast/core/dao/JobRepository.java index d2849a8eda..244c0d5eff 100644 --- a/core/src/main/java/feast/core/dao/JobRepository.java +++ b/core/src/main/java/feast/core/dao/JobRepository.java @@ -41,5 +41,5 @@ public interface JobRepository extends JpaRepository { List findByFeatureSetJobStatusesIn(List featureSetsJobStatuses); // find jobs by feast store name - List findByStoreName(String storeName); + List findByStoresName(String storeName); } diff --git a/core/src/main/java/feast/core/job/CreateJobTask.java b/core/src/main/java/feast/core/job/CreateJobTask.java new file mode 100644 index 0000000000..867dcfc02a --- /dev/null +++ b/core/src/main/java/feast/core/job/CreateJobTask.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.log.Action; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import java.time.Instant; +import java.util.Objects; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Task that starts recently created {@link Job} by using {@link JobManager}. Since it's new job its + * Id being generated from attached {@link Source} and updated accordingly in-place. + */ +@Getter +@Setter +@Builder(setterPrefix = "set") +public class CreateJobTask implements JobTask { + final Logger log = LoggerFactory.getLogger(CreateJobTask.class); + + private Job job; + private JobManager jobManager; + + @Override + public Job call() { + String jobId = createJobId(job.getSource()); + String runnerName = jobManager.getRunnerType().toString(); + + job.setRunner(jobManager.getRunnerType()); + job.setStatus(JobStatus.PENDING); + job.setId(jobId); + + try { + JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); + + job = jobManager.startJob(job); + var extId = job.getExtId(); + if (extId.isEmpty()) { + throw new RuntimeException( + String.format("Could not submit job: \n%s", "unable to retrieve job external id")); + } + + var auditMessage = "Job submitted to runner %s with ext id %s."; + JobTask.logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId); + + return job; + } catch (Exception e) { + log.error(e.getMessage()); + var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR."; + JobTask.logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName); + + job.setStatus(JobStatus.ERROR); + return job; + } + } + + String createJobId(Source source) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + String jobId = + String.format( + "%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix); + return jobId.replaceAll("_store", "-").toLowerCase(); + } +} diff --git a/core/src/main/java/feast/core/job/JobTask.java b/core/src/main/java/feast/core/job/JobTask.java new file mode 100644 index 0000000000..c7809c4ab8 --- /dev/null +++ b/core/src/main/java/feast/core/job/JobTask.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.log.Action; +import feast.core.log.AuditLogger; +import feast.core.log.Resource; +import feast.core.model.Job; +import java.util.concurrent.Callable; + +public interface JobTask extends Callable { + static void logAudit(Action action, Job job, String detail, Object... args) { + AuditLogger.log(Resource.JOB, job.getId(), action, detail, args); + } + + @Override + Job call() throws RuntimeException; +} diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java deleted file mode 100644 index a6a29f7d63..0000000000 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.core.job; - -import com.google.common.collect.Sets; -import feast.core.log.Action; -import feast.core.log.AuditLogger; -import feast.core.log.Resource; -import feast.core.model.*; -import feast.proto.core.FeatureSetProto; -import java.time.Instant; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well - * their source and sink to transition to targetStatus. - * - *

When complete, the JobUpdateTask returns the updated Job object to be pushed to the db. - */ -@Slf4j -@Getter -public class JobUpdateTask implements Callable { - - /** - * JobTargetStatus enum defines the possible target statuses that JobUpdateTask can transition a - * Job to. - */ - public enum JobTargetStatus { - RUNNING, - ABORTED - } - - private final List featureSets; - private final Source source; - private final Store store; - private final JobTargetStatus targetStatus; - private final Optional currentJob; - private final JobManager jobManager; - private final long jobUpdateTimeoutSeconds; - private final String runnerName; - - public JobUpdateTask( - List featureSets, - Source source, - Store store, - Optional currentJob, - JobManager jobManager, - long jobUpdateTimeoutSeconds, - JobTargetStatus targetStatus) { - this.featureSets = featureSets; - this.source = source; - this.store = store; - this.currentJob = currentJob; - this.jobManager = jobManager; - this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds; - this.runnerName = jobManager.getRunnerType().toString(); - this.targetStatus = targetStatus; - } - - @Override - public Job call() { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - Future submittedJob; - - if (this.targetStatus.equals(JobTargetStatus.RUNNING) && currentJob.isEmpty()) { - submittedJob = executorService.submit(this::createJob); - } else if (this.targetStatus.equals(JobTargetStatus.RUNNING) - && currentJob.isPresent() - && requiresUpdate(currentJob.get())) { - submittedJob = executorService.submit(() -> updateJob(currentJob.get())); - } else if (this.targetStatus.equals(JobTargetStatus.ABORTED) - && currentJob.isPresent() - && currentJob.get().getStatus() == JobStatus.RUNNING) { - submittedJob = executorService.submit(() -> stopJob(currentJob.get())); - } else if (this.targetStatus.equals(JobTargetStatus.ABORTED) && currentJob.isEmpty()) { - throw new IllegalArgumentException("Cannot abort an nonexistent ingestion job."); - } else { - return this.updateStatus(currentJob.get()); - } - - try { - return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.warn("Unable to start job for source {} and sink {}:", source, store); - e.printStackTrace(); - return null; - } finally { - executorService.shutdownNow(); - } - } - - boolean requiresUpdate(Job job) { - // if store subscriptions have changed - if (!Sets.newHashSet(store.getSubscriptions()) - .equals(Sets.newHashSet(job.getStore().getSubscriptions()))) { - return true; - } - - return false; - } - - private Job createJob() { - String jobId = createJobId(source, store.getName()); - return startJob(jobId); - } - - /** Start or update the job to ingest data to the sink. */ - private Job startJob(String jobId) { - Job job = - Job.builder() - .setId(jobId) - .setRunner(jobManager.getRunnerType()) - .setSource(source) - .setStore(store) - .setStatus(JobStatus.PENDING) - .setFeatureSetJobStatuses(new HashSet<>()) - .build(); - - updateFeatureSets(job); - - try { - logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); - - System.out.println( - job.equals( - Job.builder() - .setId("job") - .setExtId("") - .setRunner(Runner.DATAFLOW) - .setSource(source) - .setStore(store) - .setStatus(JobStatus.PENDING) - .build())); - - job = jobManager.startJob(job); - var extId = job.getExtId(); - if (extId.isEmpty()) { - throw new RuntimeException( - String.format("Could not submit job: \n%s", "unable to retrieve job external id")); - } - - var auditMessage = "Job submitted to runner %s with ext id %s."; - logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId); - - return job; - } catch (Exception e) { - log.error(e.getMessage()); - var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR."; - logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName); - - job.setStatus(JobStatus.ERROR); - return job; - } - } - - private void updateFeatureSets(Job job) { - Map alreadyConnected = - job.getFeatureSetJobStatuses().stream() - .collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s)); - - for (FeatureSet fs : featureSets) { - if (alreadyConnected.containsKey(fs)) { - continue; - } - - FeatureSetJobStatus status = new FeatureSetJobStatus(); - status.setFeatureSet(fs); - status.setJob(job); - if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) { - // Feature Set was already delivered to previous generation of the job - // (another words, it exists in kafka) - // so we expect Job will ack latest version based on history from kafka topic - status.setVersion(fs.getVersion()); - } - status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - job.getFeatureSetJobStatuses().add(status); - } - } - - /** Update the given job */ - private Job updateJob(Job job) { - updateFeatureSets(job); - job.setStore(store); - logAudit(Action.UPDATE, job, "Updating job %s for runner %s", job.getId(), runnerName); - return jobManager.updateJob(job); - } - - /** Stop the given job */ - private Job stopJob(Job job) { - logAudit(Action.ABORT, job, "Aborting job %s for runner %s", job.getId(), runnerName); - return jobManager.abortJob(job); - } - - private Job updateStatus(Job job) { - JobStatus currentStatus = job.getStatus(); - JobStatus newStatus = jobManager.getJobStatus(job); - if (newStatus != currentStatus) { - var auditMessage = "Job status updated: changed from %s to %s"; - logAudit(Action.STATUS_CHANGE, job, auditMessage, currentStatus, newStatus); - } - - job.setStatus(newStatus); - updateFeatureSets(job); - return job; - } - - String createJobId(Source source, String storeName) { - String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); - String jobId = - String.format( - "%s-%d-to-%s-%s", - source.getTypeString(), Objects.hashCode(source.getConfig()), storeName, dateSuffix); - return jobId.replaceAll("_store", "-").toLowerCase(); - } - - private void logAudit(Action action, Job job, String detail, Object... args) { - AuditLogger.log(Resource.JOB, job.getId(), action, detail, args); - } -} diff --git a/core/src/main/java/feast/core/job/TerminateJobTask.java b/core/src/main/java/feast/core/job/TerminateJobTask.java new file mode 100644 index 0000000000..c408578a3b --- /dev/null +++ b/core/src/main/java/feast/core/job/TerminateJobTask.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.log.Action; +import feast.core.model.Job; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +/** Task to terminate given {@link Job} by using {@link JobManager} */ +@Getter +@Setter +@Builder(setterPrefix = "set") +public class TerminateJobTask implements JobTask { + private Job job; + private JobManager jobManager; + + @Override + public Job call() { + JobTask.logAudit( + Action.ABORT, + job, + "Aborting job %s for runner %s", + job.getId(), + jobManager.getRunnerType().toString()); + return jobManager.abortJob(job); + } +} diff --git a/core/src/main/java/feast/core/job/UpdateJobStatusTask.java b/core/src/main/java/feast/core/job/UpdateJobStatusTask.java new file mode 100644 index 0000000000..9ee4d2f1ee --- /dev/null +++ b/core/src/main/java/feast/core/job/UpdateJobStatusTask.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.log.Action; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +/** + * Task that retrieves status from {@link JobManager} on given {@link Job} and update the job + * accordingly in-place + */ +@Getter +@Setter +@Builder(setterPrefix = "set") +public class UpdateJobStatusTask implements JobTask { + private Job job; + private JobManager jobManager; + + @Override + public Job call() { + JobStatus currentStatus = job.getStatus(); + JobStatus newStatus = jobManager.getJobStatus(job); + + if (newStatus != currentStatus) { + var auditMessage = "Job status updated: changed from %s to %s"; + JobTask.logAudit(Action.STATUS_CHANGE, job, auditMessage, currentStatus, newStatus); + } + + job.setStatus(newStatus); + return job; + } +} diff --git a/core/src/main/java/feast/core/job/UpgradeJobTask.java b/core/src/main/java/feast/core/job/UpgradeJobTask.java new file mode 100644 index 0000000000..e7de8f5e27 --- /dev/null +++ b/core/src/main/java/feast/core/job/UpgradeJobTask.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.log.Action; +import feast.core.model.Job; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +/** Task that upgrade given {@link Job} by restarting it in {@link JobManager} */ +@Getter +@Setter +@Builder(setterPrefix = "set") +public class UpgradeJobTask implements JobTask { + private JobManager jobManager; + private Job job; + + @Override + public Job call() { + JobTask.logAudit( + Action.UPDATE, + job, + "Updating job %s for runner %s", + job.getId(), + jobManager.getRunnerType().toString()); + + return jobManager.updateJob(job); + } +} diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 8b1c03d546..a2937ee621 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -17,6 +17,7 @@ package feast.core.job.dataflow; import static feast.core.util.PipelineUtil.detectClassPathResourcesToStage; +import static feast.core.util.StreamUtil.wrapException; import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; @@ -40,7 +41,8 @@ import feast.proto.core.StoreProto; import java.io.IOException; import java.security.GeneralSecurityException; -import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -114,18 +116,27 @@ public Job startJob(Job job) { try { String extId = submitDataflowJob( - job.getId(), job.getSource().toProto(), job.getStore().toProto(), false); + job.getId(), + job.getSource().toProto(), + job.getStores().stream() + .map(wrapException(Store::toProto)) + .collect(Collectors.toSet()), + false); job.setExtId(extId); job.setStatus(JobStatus.RUNNING); return job; - } catch (InvalidProtocolBufferException e) { + } catch (RuntimeException e) { log.error(e.getMessage()); - throw new IllegalArgumentException( - String.format( - "DataflowJobManager failed to START job with id '%s' because the job" - + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", - job.getId(), e.getMessage())); + if (e.getCause() instanceof InvalidProtocolBufferException) { + throw new IllegalArgumentException( + String.format( + "DataflowJobManager failed to START job with id '%s' because the job" + + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", + job.getId(), e.getMessage())); + } + + throw e; } } @@ -221,9 +232,9 @@ public JobStatus getJobStatus(Job job) { } private String submitDataflowJob( - String jobName, SourceProto.Source source, StoreProto.Store sink, boolean update) { + String jobName, SourceProto.Source source, Set sinks, boolean update) { try { - ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sink, update); + ImportOptions pipelineOptions = getPipelineOptions(jobName, source, sinks, update); DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions); String jobId = waitForJobToRun(pipelineResult); return jobId; @@ -234,7 +245,7 @@ private String submitDataflowJob( } private ImportOptions getPipelineOptions( - String jobName, SourceProto.Source source, StoreProto.Store sink, boolean update) + String jobName, SourceProto.Source source, Set sinks, boolean update) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); @@ -244,7 +255,8 @@ private ImportOptions getPipelineOptions( pipelineOptions.setSpecsStreamingUpdateConfigJson( jsonPrinter.print(specsStreamingUpdateConfig)); pipelineOptions.setSourceJson(jsonPrinter.print(source)); - pipelineOptions.setStoreJson(Collections.singletonList(jsonPrinter.print(sink))); + pipelineOptions.setStoresJson( + sinks.stream().map(wrapException(jsonPrinter::print)).collect(Collectors.toList())); pipelineOptions.setProject(projectId); pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setUpdate(update); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index b8b909c02a..8ee326affa 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -16,6 +16,8 @@ */ package feast.core.job.direct; +import static feast.core.util.StreamUtil.wrapException; + import com.google.common.base.Strings; import com.google.protobuf.util.JsonFormat; import feast.core.config.FeastProperties.MetricsProperties; @@ -30,7 +32,8 @@ import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; import java.io.IOException; -import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -71,7 +74,12 @@ public Runner getRunnerType() { public Job startJob(Job job) { try { ImportOptions pipelineOptions = - getPipelineOptions(job.getId(), job.getSource().toProto(), job.getStore().toProto()); + getPipelineOptions( + job.getId(), + job.getSource().toProto(), + job.getStores().stream() + .map(wrapException(Store::toProto)) + .collect(Collectors.toSet())); PipelineResult pipelineResult = runPipeline(pipelineOptions); DirectJob directJob = new DirectJob(job.getId(), pipelineResult); jobs.add(directJob); @@ -85,16 +93,17 @@ public Job startJob(Job job) { } private ImportOptions getPipelineOptions( - String jobName, SourceProto.Source source, StoreProto.Store sink) + String jobName, SourceProto.Source source, Set sinks) throws IOException, IllegalAccessException { ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(defaultOptions.toArgs()).as(ImportOptions.class); - pipelineOptions.setSpecsStreamingUpdateConfigJson( - JsonFormat.printer().print(specsStreamingUpdateConfig)); - pipelineOptions.setSourceJson(JsonFormat.printer().print(source)); + JsonFormat.Printer printer = JsonFormat.printer(); + pipelineOptions.setSpecsStreamingUpdateConfigJson(printer.print(specsStreamingUpdateConfig)); + pipelineOptions.setSourceJson(printer.print(source)); pipelineOptions.setJobName(jobName); - pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); + pipelineOptions.setStoresJson( + sinks.stream().map(wrapException(printer::print)).collect(Collectors.toList())); pipelineOptions.setRunner(DirectRunner.class); pipelineOptions.setDefaultFeastProject(Project.DEFAULT_NAME); pipelineOptions.setProject(""); // set to default value to satisfy validation @@ -137,13 +146,15 @@ public Job updateJob(Job job) { @Override public Job abortJob(Job job) { DirectJob directJob = jobs.get(job.getExtId()); - try { - directJob.abort(); - } catch (IOException e) { - throw new RuntimeException( - Strings.lenientFormat("Unable to abort DirectRunner job %s", job.getExtId(), e)); + if (directJob != null) { + try { + directJob.abort(); + } catch (IOException e) { + throw new RuntimeException( + Strings.lenientFormat("Unable to abort DirectRunner job %s", job.getExtId(), e)); + } + jobs.remove(job.getExtId()); } - jobs.remove(job.getExtId()); job.setStatus(JobStatus.ABORTING); return job; diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index fb8b071b81..3df1d49c23 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -16,6 +16,8 @@ */ package feast.core.model; +import static feast.core.util.StreamUtil.wrapException; + import com.google.protobuf.InvalidProtocolBufferException; import feast.core.job.Runner; import feast.proto.core.FeatureSetProto; @@ -73,10 +75,21 @@ public JobBuilder setSource(Source source) { @Column(name = "source_config") private String sourceConfig; - // Sink id - @ManyToOne - @JoinColumn(name = "store_name") - private Store store; + // Sinks + @ManyToMany + @JoinTable( + name = "jobs_stores", + joinColumns = @JoinColumn(name = "job_id"), + inverseJoinColumns = @JoinColumn(name = "store_name"), + indexes = { + @Index(name = "idx_jobs_stores_job_id", columnList = "job_id"), + @Index(name = "idx_jobs_stores_store_name", columnList = "store_name") + }) + private Set stores; + + @Deprecated + @Column(name = "store_name") + String storeName; // FeatureSets populated by the job via intermediate FeatureSetJobStatus model @OneToMany(mappedBy = "job", cascade = CascadeType.ALL) @@ -98,8 +111,8 @@ public boolean isRunning() { return getStatus() == JobStatus.RUNNING; } - public String getSinkName() { - return store.getName(); + public boolean isDeployed() { + return getExtId() != null && !getExtId().isEmpty(); } public List getFeatureSets() { @@ -135,7 +148,10 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce .setExternalId(this.getExtId()) .setStatus(this.getStatus().toProto()) .setSource(this.getSource().toProto()) - .setStore(this.getStore().toProto()) + .addAllStores( + this.getStores().stream() + .map(wrapException(Store::toProto)) + .collect(Collectors.toSet())) .addAllFeatureSets(featureSetProtos) .build(); @@ -144,7 +160,7 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce @Override public int hashCode() { - return Objects.hash(getSource(), this.store, this.runner); + return Objects.hash(getSource(), this.stores, this.runner); } @Override @@ -157,7 +173,7 @@ public boolean equals(Object obj) { return false; } else if (!getSource().equals(other.getSource())) { return false; - } else if (!store.equals(other.store)) { + } else if (!stores.equals(other.stores)) { return false; } return true; diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 83ffe075c7..7ed3f61807 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -18,15 +18,14 @@ import static feast.core.model.FeatureSet.parseReference; +import com.google.common.collect.Sets; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.dao.SourceRepository; -import feast.core.job.JobManager; -import feast.core.job.JobUpdateTask; -import feast.core.job.JobUpdateTask.JobTargetStatus; +import feast.core.job.*; import feast.core.model.*; import feast.core.model.FeatureSet; import feast.core.model.Job; @@ -37,12 +36,7 @@ import feast.proto.core.CoreServiceProto.ListStoresResponse; import feast.proto.core.FeatureSetProto; import feast.proto.core.IngestionJobProto; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -102,8 +96,8 @@ public JobCoordinatorService( @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") public void Poll() throws InvalidProtocolBufferException { log.info("Polling for new jobs..."); - List> sourceStoreMappings = getSourceToStoreMappings(); - List jobUpdateTasks = makeJobUpdateTasks(sourceStoreMappings); + Map> sourceStoreMappings = getSourceToStoreMappings(); + List jobUpdateTasks = makeJobUpdateTasks(sourceStoreMappings); if (jobUpdateTasks.isEmpty()) { log.info("No jobs found."); @@ -114,7 +108,7 @@ public void Poll() throws InvalidProtocolBufferException { startOrUpdateJobs(jobUpdateTasks); } - void startOrUpdateJobs(List tasks) { + void startOrUpdateJobs(List tasks) { ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); ExecutorCompletionService ecs = new ExecutorCompletionService<>(executorService); tasks.forEach(ecs::submit); @@ -123,11 +117,11 @@ void startOrUpdateJobs(List tasks) { List startedJobs = new ArrayList<>(); while (completedTasks < tasks.size()) { try { - Job job = ecs.take().get(); + Job job = ecs.take().get(jobProperties.getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS); if (job != null) { startedJobs.add(job); } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { log.warn("Unable to start or update job: {}", e.getMessage()); } completedTasks++; @@ -143,68 +137,132 @@ void startOrUpdateJobs(List tasks) { * stop ingestion jobs its the required ingestions to maintained ingestion jobs are already * RUNNING. * - * @param sourceStoreMappings a list of source to store pairs where ingestion jobs would have to + * @param sourceStoresMappings a list of source to store pairs where ingestion jobs would have to * be maintained for ingestion to work correctly. * @return list of job update tasks required to reconcile the current ingestion jobs to the state * that is defined by sourceStoreMap. */ - private List makeJobUpdateTasks(List> sourceStoreMappings) { - List jobUpdateTasks = new LinkedList<>(); + List makeJobUpdateTasks(Map> sourceStoresMappings) { + List jobTasks = new LinkedList<>(); // Ensure a running job for each source to store mapping - long updateTimeout = jobProperties.getJobUpdateTimeoutSeconds(); - List requiredJobs = new LinkedList<>(); + List activeJobs = new LinkedList<>(); boolean isSafeToStopJobs = true; - for (Pair mapping : sourceStoreMappings) { - Source source = mapping.getLeft(); - Store store = mapping.getRight(); - - Optional sourceToStoreJob = getJob(source, store); - if (sourceToStoreJob.isPresent()) { - Job job = sourceToStoreJob.get(); - // Record the job as required to safeguard it from getting stopped - requiredJobs.add(sourceToStoreJob.get()); + + for (Map.Entry> mapping : sourceStoresMappings.entrySet()) { + Source source = mapping.getKey(); + Set stores = mapping.getValue(); + Set featureSets = + stores.stream() + .flatMap(s -> getFeatureSetsForStore(s).stream()) + .collect(Collectors.toSet()); + + Job job = getOrCreateJob(source, stores); + + if (job.isDeployed()) { if (!job.isRunning()) { + jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build()); + // Mark that it is not safe to stop jobs without disrupting ingestion isSafeToStopJobs = false; + continue; } + + if (jobRequiresUpgrade(job, stores)) { + job.setStores(stores); + + jobTasks.add(UpgradeJobTask.builder().setJob(job).setJobManager(jobManager).build()); + } else { + jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build()); + } + } else { + jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build()); } - jobUpdateTasks.add( - new JobUpdateTask( - getFeatureSetsForStore(store), - source, - store, - sourceToStoreJob, - jobManager, - updateTimeout, - JobTargetStatus.RUNNING)); + allocateFeatureSets(job, featureSets); + + // Record the job as required to safeguard it from getting stopped + activeJobs.add(job); } - // Stop extra jobs that are not required to mantain ingestion when safe + // Stop extra jobs that are not required to maintain ingestion when safe if (isSafeToStopJobs) { - getExtraJobs(requiredJobs) + getExtraJobs(activeJobs) .forEach( extraJob -> { - jobUpdateTasks.add( - new JobUpdateTask( - extraJob.getFeatureSets(), - extraJob.getSource(), - extraJob.getStore(), - Optional.of(extraJob), - jobManager, - updateTimeout, - JobTargetStatus.ABORTED)); + jobTasks.add( + TerminateJobTask.builder().setJob(extraJob).setJobManager(jobManager).build()); }); } - return jobUpdateTasks; + return jobTasks; } - /** Get the non terminated ingestion job ingesting between source and source if exists. */ - @Transactional - public Optional getJob(Source source, Store store) { + /** + * Decides whether we need to upgrade (restart) given job. Since we send updated FeatureSets to + * IngestionJob via Kafka, and there's only one source per job (if it change - new job would be + * created) the only things that can cause upgrade here are stores: new stores can be added, or + * existing stores will change subscriptions. + * + * @param job {@link Job} to check + * @param stores Set of {@link Store} new version of stores (vs current version job.getStores()) + * @return boolean - need to upgrade + */ + private boolean jobRequiresUpgrade(Job job, Set stores) { + // if store subscriptions have changed + if (!Sets.newHashSet(stores).equals(Sets.newHashSet(job.getStores()))) { + return true; + } + + return false; + } + + /** + * Connects given {@link Job} with FeatureSets by creating {@link FeatureSetJobStatus}. This + * connection represents responsibility of the job to handle allocated FeatureSets. We use this + * connection {@link FeatureSetJobStatus} to monitor Ingestion of specific FeatureSet and Specs + * delivery status. + * + *

Only after this connection is created FeatureSetSpec could be sent to IngestionJob. + * + * @param job {@link Job} responsible job + * @param featureSets Set of {@link FeatureSet} featureSets to allocate to this job + */ + void allocateFeatureSets(Job job, Set featureSets) { + Map alreadyConnected = + job.getFeatureSetJobStatuses().stream() + .collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s)); + + for (FeatureSet fs : featureSets) { + if (alreadyConnected.containsKey(fs)) { + continue; + } + + FeatureSetJobStatus status = new FeatureSetJobStatus(); + status.setFeatureSet(fs); + status.setJob(job); + if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) { + // Feature Set was already delivered to previous generation of the job + // (another words, it exists in kafka) + // so we expect Job will ack latest version based on history from kafka topic + status.setVersion(fs.getVersion()); + } + status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); + job.getFeatureSetJobStatuses().add(status); + } + } + + /** Get the non terminated ingestion job ingesting for given source. */ + public Job getOrCreateJob(Source source, Set stores) { return jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates()); + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) + .orElseGet( + () -> + Job.builder() + .setRunner(jobManager.getRunnerType()) + .setSource(source) + .setStores(stores) + .setFeatureSetJobStatuses(new HashSet<>()) + .build()); } /** Get running extra ingestion jobs that have ids not in keepJobs */ @@ -218,31 +276,31 @@ private Collection getExtraJobs(List keepJobs) { } /** - * Generate a source to store mapping. The mapping maps the source/store pairs in which ingestion - * jobs would have to be maintained for ingestion to work correctly. + * Generate a source to stores mapping. The mapping maps the source to Set-of-stores in which + * ingestion jobs would have to be maintained for ingestion to work correctly. * - * @return a list of pairs signifying a mapping from source to store. + * @return a Map from source to stores. */ - private List> getSourceToStoreMappings() { + private Map> getSourceToStoreMappings() { ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); List stores = listStoresResponse.getStoreList().stream() - .map(storeSpec -> Store.fromProto(storeSpec)) + .map(Store::fromProto) .collect(Collectors.toList()); // build mapping from source to store. // compile a set of sources via subscribed FeatureSets of stores. - List> sourceStoreMappings = - stores.stream() - .flatMap( - store -> - getFeatureSetsForStore(store).stream() - .map(featureSet -> featureSet.getSource()) - .map(source -> Pair.of(source, store))) - .distinct() - .collect(Collectors.toList()); - return sourceStoreMappings; + return stores.stream() + .flatMap( + store -> + getFeatureSetsForStore(store).stream() + .map(FeatureSet::getSource) + .map(source -> Pair.of(source, store))) + .distinct() + .collect( + Collectors.groupingBy( + Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet()))); } /** diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index b300959cd9..315988c3d3 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -108,7 +108,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request) // multiple filters can apply together in an 'and' operation if (!filter.getStoreName().isEmpty()) { // find jobs by name - List jobs = this.jobRepository.findByStoreName(filter.getStoreName()); + List jobs = this.jobRepository.findByStoresName(filter.getStoreName()); Set jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet()); matchingJobIds = this.mergeResults(matchingJobIds, jobIds); } diff --git a/core/src/main/java/feast/core/util/StreamUtil.java b/core/src/main/java/feast/core/util/StreamUtil.java new file mode 100644 index 0000000000..4925f02310 --- /dev/null +++ b/core/src/main/java/feast/core/util/StreamUtil.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.util; + +import java.util.function.Function; + +/** Collection of functions useful for stream-style programming */ +public class StreamUtil { + + @FunctionalInterface + public interface CheckedFunction { + R apply(T t) throws Exception; + } + + /** + * Wrap function to convert its checked exceptions into RuntimeException + * + * @param checkedFunction function that throws checked exception + * @param input + * @param output + * @return wrapped function that doesn't throw checked exceptions + */ + public static Function wrapException(CheckedFunction checkedFunction) { + return t -> { + try { + return checkedFunction.apply(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/core/src/main/resources/db/migration/V2.1__Many_Stores_Per_Job.sql b/core/src/main/resources/db/migration/V2.1__Many_Stores_Per_Job.sql new file mode 100644 index 0000000000..c70df67039 --- /dev/null +++ b/core/src/main/resources/db/migration/V2.1__Many_Stores_Per_Job.sql @@ -0,0 +1,16 @@ +-- Migrating to Many2Many relationship between Job and Store + +CREATE TABLE jobs_stores( + job_id character varying(255) NOT NULL, + store_name character varying(255) NOT NULL +); + +ALTER TABLE ONLY jobs_stores + ADD CONSTRAINT jobs_stores_job_fkey FOREIGN KEY (job_id) REFERENCES jobs(id); + +ALTER TABLE ONLY jobs_stores + ADD CONSTRAINT jobs_stores_store_fkey FOREIGN KEY (store_name) REFERENCES stores(name); + + +CREATE INDEX idx_jobs_stores_job_id ON jobs_stores USING btree (job_id); +CREATE INDEX idx_jobs_stores_store_name ON jobs_stores USING btree (store_name); diff --git a/core/src/test/java/feast/core/job/JobTasksTest.java b/core/src/test/java/feast/core/job/JobTasksTest.java new file mode 100644 index 0000000000..950bd23d1e --- /dev/null +++ b/core/src/test/java/feast/core/job/JobTasksTest.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.google.common.collect.ImmutableSet; +import feast.core.model.*; +import feast.core.util.TestUtil; +import feast.proto.core.SourceProto; +import feast.proto.core.SourceProto.KafkaSourceConfig; +import feast.proto.core.SourceProto.SourceType; +import feast.proto.core.StoreProto; +import feast.proto.core.StoreProto.Store.RedisConfig; +import feast.proto.core.StoreProto.Store.StoreType; +import feast.proto.core.StoreProto.Store.Subscription; +import java.util.Collections; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +public class JobTasksTest { + private static final Runner RUNNER = Runner.DATAFLOW; + + @Mock private JobManager jobManager; + + private Store store; + private Source source; + private FeatureSet featureSet1; + private FeatureSet featureSet2; + + @Before + public void setUp() { + initMocks(this); + when(jobManager.getRunnerType()).thenReturn(RUNNER); + + store = + Store.fromProto( + StoreProto.Store.newBuilder() + .setName("test") + .setType(StoreType.REDIS) + .setRedisConfig(RedisConfig.newBuilder().build()) + .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").build()) + .build()); + + source = + Source.fromProto( + SourceProto.Source.newBuilder() + .setType(SourceType.KAFKA) + .setKafkaSourceConfig( + KafkaSourceConfig.newBuilder() + .setTopic("topic") + .setBootstrapServers("servers:9092") + .build()) + .build()); + } + + Job makeJob(String extId, List featureSets, JobStatus status) { + return Job.builder() + .setId("job") + .setExtId(extId) + .setRunner(RUNNER) + .setSource(source) + .setStores(ImmutableSet.of(store)) + .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets)) + .setStatus(status) + .build(); + } + + CreateJobTask makeCreateTask(Job currentJob) { + return CreateJobTask.builder().setJob(currentJob).setJobManager(jobManager).build(); + } + + UpgradeJobTask makeUpgradeTask(Job currentJob) { + return UpgradeJobTask.builder().setJob(currentJob).setJobManager(jobManager).build(); + } + + UpdateJobStatusTask makeCheckStatusTask(Job currentJob) { + return UpdateJobStatusTask.builder().setJob(currentJob).setJobManager(jobManager).build(); + } + + TerminateJobTask makeTerminateTask(Job currentJob) { + return TerminateJobTask.builder().setJob(currentJob).setJobManager(jobManager).build(); + } + + @Test + public void shouldCreateJobIfNotPresent() { + Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); + + CreateJobTask task = spy(makeCreateTask(expectedInput)); + doReturn("job").when(task).createJobId(source); + + Job expected = makeJob("ext", Collections.emptyList(), JobStatus.RUNNING); + + when(jobManager.startJob(expectedInput)).thenReturn(expected); + + Job actual = task.call(); + assertThat(actual, equalTo(expected)); + } + + @Test + public void shouldUpdateJobStatusIfNotCreateOrUpdate() { + Job originalJob = makeJob("ext", Collections.emptyList(), JobStatus.RUNNING); + JobTask jobUpdateTask = makeCheckStatusTask(originalJob); + + when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); + Job updated = jobUpdateTask.call(); + + assertThat(updated.getStatus(), equalTo(JobStatus.ABORTING)); + } + + @Test + public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { + Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); + + CreateJobTask jobUpdateTask = spy(makeCreateTask(expectedInput)); + doReturn("job").when(jobUpdateTask).createJobId(source); + + Job expected = makeJob("", Collections.emptyList(), JobStatus.ERROR); + + when(jobManager.startJob(expectedInput)) + .thenThrow(new RuntimeException("Something went wrong")); + + Job actual = jobUpdateTask.call(); + assertThat(actual, equalTo(expected)); + } + + @Test + public void shouldStopJobIfTargetStatusIsAbort() { + Job originalJob = makeJob("ext", Collections.emptyList(), JobStatus.RUNNING); + JobTask jobUpdateTask = makeTerminateTask(originalJob); + + Job expected = makeJob("ext", Collections.emptyList(), JobStatus.ABORTING); + + when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); + when(jobManager.abortJob(originalJob)).thenReturn(expected); + + Job actual = jobUpdateTask.call(); + verify(jobManager, times(1)).abortJob(originalJob); + assertThat(actual, equalTo(expected)); + } +} diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java deleted file mode 100644 index a9f19ba6d5..0000000000 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.core.job; - -import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import feast.core.job.JobUpdateTask.JobTargetStatus; -import feast.core.model.*; -import feast.core.util.TestUtil; -import feast.proto.core.FeatureSetProto; -import feast.proto.core.FeatureSetProto.FeatureSetMeta; -import feast.proto.core.FeatureSetProto.FeatureSetSpec; -import feast.proto.core.SourceProto; -import feast.proto.core.SourceProto.KafkaSourceConfig; -import feast.proto.core.SourceProto.SourceType; -import feast.proto.core.StoreProto; -import feast.proto.core.StoreProto.Store.RedisConfig; -import feast.proto.core.StoreProto.Store.StoreType; -import feast.proto.core.StoreProto.Store.Subscription; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import org.hamcrest.core.IsNull; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; - -public class JobUpdateTaskTest { - private static final Runner RUNNER = Runner.DATAFLOW; - - private static final FeatureSetProto.FeatureSet.Builder fsBuilder = - FeatureSetProto.FeatureSet.newBuilder().setMeta(FeatureSetMeta.newBuilder()); - private static final FeatureSetSpec.Builder specBuilder = FeatureSetSpec.newBuilder(); - - @Mock private JobManager jobManager; - - private Store store; - private Source source; - private FeatureSet featureSet1; - private FeatureSet featureSet2; - - @Before - public void setUp() { - initMocks(this); - when(jobManager.getRunnerType()).thenReturn(RUNNER); - - store = - Store.fromProto( - StoreProto.Store.newBuilder() - .setName("test") - .setType(StoreType.REDIS) - .setRedisConfig(RedisConfig.newBuilder().build()) - .addSubscriptions(Subscription.newBuilder().setProject("*").setName("*").build()) - .build()); - - source = - Source.fromProto( - SourceProto.Source.newBuilder() - .setType(SourceType.KAFKA) - .setKafkaSourceConfig( - KafkaSourceConfig.newBuilder() - .setTopic("topic") - .setBootstrapServers("servers:9092") - .build()) - .build()); - - featureSet1 = - FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); - featureSet2 = - FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); - featureSet1.setSource(source); - featureSet2.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); - featureSet2.setVersion(5); - } - - Job makeJob(String extId, List featureSets, JobStatus status) { - return Job.builder() - .setId("job") - .setExtId(extId) - .setRunner(RUNNER) - .setSource(source) - .setStore(store) - .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSets)) - .setStatus(status) - .build(); - } - - JobUpdateTask makeTask( - List featureSets, Optional currentJob, JobTargetStatus targetStatus) { - return new JobUpdateTask( - featureSets, source, store, currentJob, jobManager, 100L, targetStatus); - } - - @Test - public void shouldUpdateJobIfPresent() { - FeatureSet featureSet2 = - FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); - List existingFeatureSetsPopulatedByJob = Collections.singletonList(featureSet1); - List newFeatureSetsPopulatedByJob = Arrays.asList(featureSet1, featureSet2); - - Job originalJob = makeJob("old_ext", existingFeatureSetsPopulatedByJob, JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - makeTask(newFeatureSetsPopulatedByJob, Optional.of(originalJob), JobTargetStatus.RUNNING); - Job submittedJob = makeJob("old_ext", newFeatureSetsPopulatedByJob, JobStatus.RUNNING); - - Job expected = makeJob("new_ext", newFeatureSetsPopulatedByJob, JobStatus.PENDING); - when(jobManager.updateJob(submittedJob)).thenReturn(expected); - Job actual = jobUpdateTask.call(); - - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldCreateJobIfNotPresent() { - var featureSets = Collections.singletonList(featureSet1); - JobUpdateTask jobUpdateTask = - spy(makeTask(featureSets, Optional.empty(), JobTargetStatus.RUNNING)); - doReturn("job").when(jobUpdateTask).createJobId(source, "test"); - - Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); - Job expected = makeJob("ext", featureSets, JobStatus.RUNNING); - - when(jobManager.startJob(expectedInput)).thenReturn(expected); - - Job actual = jobUpdateTask.call(); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldUpdateJobStatusIfNotCreateOrUpdate() { - var featureSets = Collections.singletonList(featureSet1); - Job originalJob = makeJob("ext", featureSets, JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - makeTask(featureSets, Optional.of(originalJob), JobTargetStatus.RUNNING); - - when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); - Job updated = jobUpdateTask.call(); - - assertThat(updated.getStatus(), equalTo(JobStatus.ABORTING)); - } - - @Test - public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { - var featureSets = Collections.singletonList(featureSet1); - JobUpdateTask jobUpdateTask = - spy(makeTask(featureSets, Optional.empty(), JobTargetStatus.RUNNING)); - doReturn("job").when(jobUpdateTask).createJobId(source, "test"); - - Job expectedInput = makeJob("", featureSets, JobStatus.PENDING); - Job expected = makeJob("", featureSets, JobStatus.ERROR); - - when(jobManager.startJob(expectedInput)) - .thenThrow(new RuntimeException("Something went wrong")); - - Job actual = jobUpdateTask.call(); - assertThat(actual, equalTo(expected)); - } - - @Test - public void shouldTimeout() { - var featureSets = Collections.singletonList(featureSet1); - var timeoutSeconds = 0L; - JobUpdateTask jobUpdateTask = - spy( - new JobUpdateTask( - featureSets, - source, - store, - Optional.empty(), - jobManager, - timeoutSeconds, - JobTargetStatus.RUNNING)); - - Job actual = jobUpdateTask.call(); - assertThat(actual, is(IsNull.nullValue())); - } - - @Test - public void featureSetsShouldBeUpdated() { - Job job = makeJob("", Collections.emptyList(), JobStatus.RUNNING); - - when(jobManager.getJobStatus(job)).thenReturn(JobStatus.RUNNING); - - JobUpdateTask jobUpdateTask = - makeTask(List.of(featureSet1), Optional.of(job), JobTargetStatus.RUNNING); - - jobUpdateTask.call(); - - FeatureSetJobStatus expectedStatus1 = new FeatureSetJobStatus(); - expectedStatus1.setJob(job); - expectedStatus1.setFeatureSet(featureSet1); - expectedStatus1.setVersion(0); - expectedStatus1.setDeliveryStatus( - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - - assertThat(job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1)); - - expectedStatus1.setDeliveryStatus(STATUS_DELIVERED); - job.getFeatureSetJobStatuses().forEach(j -> j.setDeliveryStatus(STATUS_DELIVERED)); - - JobUpdateTask jobUpdateTask2 = - makeTask(List.of(featureSet1, featureSet2), Optional.of(job), JobTargetStatus.RUNNING); - jobUpdateTask2.call(); - - FeatureSetJobStatus expectedStatus2 = new FeatureSetJobStatus(); - expectedStatus2.setJob(job); - expectedStatus2.setFeatureSet(featureSet2); - expectedStatus2.setVersion(featureSet2.getVersion()); - expectedStatus2.setDeliveryStatus( - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS); - - assertThat( - job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1, expectedStatus2)); - } - - @Test - public void shouldStopJobIfTargetStatusIsAbort() { - var featureSets = Collections.singletonList(featureSet1); - Job originalJob = makeJob("ext", featureSets, JobStatus.RUNNING); - JobUpdateTask jobUpdateTask = - makeTask(featureSets, Optional.of(originalJob), JobTargetStatus.ABORTED); - - Job expected = makeJob("ext", featureSets, JobStatus.ABORTING); - - when(jobManager.getJobStatus(originalJob)).thenReturn(JobStatus.ABORTING); - when(jobManager.abortJob(originalJob)).thenReturn(expected); - - Job actual = jobUpdateTask.call(); - verify(jobManager, times(1)).abortJob(originalJob); - assertThat(actual, equalTo(expected)); - } -} diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 5f8403f797..0c182a2319 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -25,6 +25,7 @@ import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.testing.auth.oauth2.MockGoogleCredential; import com.google.api.services.dataflow.Dataflow; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.protobuf.Duration; @@ -149,7 +150,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setAppName("DataflowJobManager"); expectedPipelineOptions.setLabels(defaults.getLabelsMap()); expectedPipelineOptions.setJobName(jobName); - expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + expectedPipelineOptions.setStoresJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setSourceJson(printer.print(source)); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); @@ -169,7 +170,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(Source.fromProto(source)) - .setStore(Store.fromProto(store)) + .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus)) .setStatus(JobStatus.PENDING) .build(); @@ -202,7 +203,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { actualPipelineOptions.getMetricsExporterType(), equalTo(expectedPipelineOptions.getMetricsExporterType())); assertThat( - actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + actualPipelineOptions.getStoresJson(), equalTo(expectedPipelineOptions.getStoresJson())); assertThat( actualPipelineOptions.getSourceJson(), equalTo(expectedPipelineOptions.getSourceJson())); assertThat( @@ -252,7 +253,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(Source.fromProto(source)) - .setStore(Store.fromProto(store)) + .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(Sets.newHashSet(featureSetJobStatus)) .setStatus(JobStatus.PENDING) .build(); diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 5b70c59d69..e9db2fde0e 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.protobuf.Duration; import com.google.protobuf.util.JsonFormat; @@ -132,7 +133,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setRunner(DirectRunner.class); expectedPipelineOptions.setBlockOnRun(false); expectedPipelineOptions.setTargetParallelism(1); - expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + expectedPipelineOptions.setStoresJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); expectedPipelineOptions.setSourceJson(printer.print(source)); @@ -149,7 +150,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { .setExtId("") .setRunner(Runner.DIRECT) .setSource(Source.fromProto(source)) - .setStore(Store.fromProto(store)) + .setStores(ImmutableSet.of(Store.fromProto(store))) .setFeatureSetJobStatuses(makeFeatureSetJobStatus(FeatureSet.fromProto(featureSet))) .setStatus(JobStatus.PENDING) .build(); @@ -174,7 +175,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { actualPipelineOptions.getMetricsExporterType(), equalTo(expectedPipelineOptions.getMetricsExporterType())); assertThat( - actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + actualPipelineOptions.getStoresJson(), equalTo(expectedPipelineOptions.getStoresJson())); assertThat( actualPipelineOptions.getSourceJson(), equalTo(expectedPipelineOptions.getSourceJson())); assertThat( diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 3f7267671a..c19641149b 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -16,6 +16,8 @@ */ package feast.core.service; +import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; +import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -25,6 +27,8 @@ import static org.mockito.MockitoAnnotations.initMocks; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; @@ -32,8 +36,7 @@ import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; import feast.core.dao.SourceRepository; -import feast.core.job.JobManager; -import feast.core.job.Runner; +import feast.core.job.*; import feast.core.model.*; import feast.core.util.TestUtil; import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest.Filter; @@ -48,11 +51,7 @@ import feast.proto.core.SourceProto.SourceType; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.CancellationException; import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -167,7 +166,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(source) - .setStore(store) + .setStores(ImmutableSet.of(store)) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1, featureSet2)) .setStatus(JobStatus.PENDING) .build(); @@ -220,7 +219,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(source1) - .setStore(store) + .setStores(ImmutableSet.of(store)) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1)) .setStatus(JobStatus.PENDING) .build(); @@ -234,7 +233,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(source2) - .setStore(store) + .setStores(ImmutableSet.of(store)) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet2)) .setStatus(JobStatus.PENDING) .build(); @@ -307,7 +306,7 @@ public void shouldGroupJobsBySourceAndIgnoreDuplicateSourceObjects() .setId("id") .setExtId("") .setSource(source1) - .setStore(store) + .setStores(ImmutableSet.of(store)) .setRunner(Runner.DATAFLOW) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1, featureSet2)) .setStatus(JobStatus.PENDING) @@ -369,7 +368,7 @@ public void shouldStopDuplicateJobsForSource() throws InvalidProtocolBufferExcep .setId(String.format("id%d", i)) .setExtId(String.format("extid%d", i)) .setSource(source) - .setStore(store) + .setStores(ImmutableSet.of(store)) .setRunner(Runner.DATAFLOW) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet)) .setStatus(JobStatus.RUNNING) @@ -397,10 +396,7 @@ public void shouldStopDuplicateJobsForSource() throws InvalidProtocolBufferExcep when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source.getType(), - source.getConfig(), - store.getName(), - JobStatus.getTerminalStates())) + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.of(inputJobs.get(0))); when(jobRepository.findByStatus(JobStatus.RUNNING)).thenReturn(inputJobs); @@ -448,7 +444,7 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(source1) - .setStore(store1) + .setStores(ImmutableSet.of(store1)) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet1)) .setStatus(JobStatus.PENDING) .build(); @@ -462,7 +458,7 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE .setExtId("") .setRunner(Runner.DATAFLOW) .setSource(source2) - .setStore(store2) + .setStores(ImmutableSet.of(store2)) .setFeatureSetJobStatuses(TestUtil.makeFeatureSetJobStatus(featureSet2)) .setStatus(JobStatus.PENDING) .build(); @@ -493,17 +489,11 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE when(jobRepository.findByStatus(JobStatus.RUNNING)).thenReturn(List.of()); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source1.getType(), - source1.getConfig(), - store1.getName(), - JobStatus.getTerminalStates())) + source1.getType(), source1.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source2.getType(), - source1.getConfig(), - store2.getName(), - JobStatus.getTerminalStates())) + source2.getType(), source2.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); jcs.Poll(); @@ -524,14 +514,11 @@ public void shouldSendPendingFeatureSetToJobs() { fs1.setVersion(2); FeatureSetJobStatus status1 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_DELIVERED, 1); FeatureSetJobStatus status2 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_DELIVERED, 1); FeatureSetJobStatus status3 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.ABORTED, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 2); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.ABORTED, STATUS_DELIVERED, 2); // spec needs to be send fs1.getJobStatuses().addAll(ImmutableList.of(status1, status2, status3)); @@ -546,9 +533,7 @@ public void shouldSendPendingFeatureSetToJobs() { .addAll( ImmutableList.of( TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, - 5))); + JobStatus.RUNNING, STATUS_IN_PROGRESS, 5))); // feature set without running jobs attached FeatureSet fs3 = @@ -558,9 +543,7 @@ public void shouldSendPendingFeatureSetToJobs() { .addAll( ImmutableList.of( TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.ABORTED, - FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, - 5))); + JobStatus.ABORTED, STATUS_IN_PROGRESS, 5))); when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) .thenReturn(ImmutableList.of(fs1, fs2, fs3)); @@ -572,19 +555,13 @@ public void shouldSendPendingFeatureSetToJobs() { verify(kafkaTemplate, never()).sendDefault(eq(fs3.getReference()), any(FeatureSetSpec.class)); assertThat(status1.getVersion(), is(2)); - assertThat( - status1.getDeliveryStatus(), - is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat(status1.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); assertThat(status2.getVersion(), is(2)); - assertThat( - status2.getDeliveryStatus(), - is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat(status2.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); assertThat(status3.getVersion(), is(2)); - assertThat( - status3.getDeliveryStatus(), - is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(status3.getDeliveryStatus(), is(STATUS_DELIVERED)); } @Test @@ -596,8 +573,7 @@ public void shouldNotUpdateJobStatusVersionWhenKafkaUnavailable() { fsInTest.setVersion(2); FeatureSetJobStatus featureSetJobStatus = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_DELIVERED, 1); fsInTest.getJobStatuses().add(featureSetJobStatus); CancellationException exc = new CancellationException(); @@ -615,11 +591,9 @@ public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { TestUtil.CreateFeatureSet( "fs", "project", Collections.emptyList(), Collections.emptyList()); FeatureSetJobStatus j1 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_IN_PROGRESS, 1); FeatureSetJobStatus j2 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_IN_PROGRESS, 1); fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2)); @@ -631,10 +605,8 @@ public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); - assertThat( - j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); - assertThat( - j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS)); + assertThat(j1.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); + assertThat(j2.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); } @Test @@ -645,14 +617,11 @@ public void specAckListenerShouldUpdateFeatureSetStatus() { fsInTest.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING); FeatureSetJobStatus j1 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_IN_PROGRESS, 1); FeatureSetJobStatus j2 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.RUNNING, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.RUNNING, STATUS_IN_PROGRESS, 1); FeatureSetJobStatus j3 = - TestUtil.CreateFeatureSetJobStatusWithJob( - JobStatus.ABORTED, FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS, 1); + TestUtil.CreateFeatureSetJobStatusWithJob(JobStatus.ABORTED, STATUS_IN_PROGRESS, 1); fsInTest.getJobStatuses().addAll(Arrays.asList(j1, j2, j3)); @@ -663,19 +632,124 @@ public void specAckListenerShouldUpdateFeatureSetStatus() { jcs.listenAckFromJobs( newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j1.getJob().getId())); - assertThat( - j1.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(j1.getDeliveryStatus(), is(STATUS_DELIVERED)); assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); jcs.listenAckFromJobs( newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j2.getJob().getId())); - assertThat( - j2.getDeliveryStatus(), is(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED)); + assertThat(j2.getDeliveryStatus(), is(STATUS_DELIVERED)); assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_READY)); } + @Test + public void featureSetsShouldBeAllocated() { + FeatureSetProto.FeatureSet.Builder fsBuilder = + FeatureSetProto.FeatureSet.newBuilder().setMeta(FeatureSetMeta.newBuilder()); + FeatureSetSpec.Builder specBuilder = FeatureSetSpec.newBuilder(); + + FeatureSet featureSet1 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet1")).build()); + FeatureSet featureSet2 = + FeatureSet.fromProto(fsBuilder.setSpec(specBuilder.setName("featureSet2")).build()); + + featureSet2.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY); + featureSet2.setVersion(5); + + Job job = new Job(); + jcs.allocateFeatureSets(job, ImmutableSet.of(featureSet1)); + + FeatureSetJobStatus expectedStatus1 = new FeatureSetJobStatus(); + expectedStatus1.setJob(job); + expectedStatus1.setFeatureSet(featureSet1); + expectedStatus1.setVersion(0); + expectedStatus1.setDeliveryStatus(STATUS_IN_PROGRESS); + + assertThat(job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1)); + + expectedStatus1.setDeliveryStatus(STATUS_DELIVERED); + job.getFeatureSetJobStatuses().forEach(j -> j.setDeliveryStatus(STATUS_DELIVERED)); + + jcs.allocateFeatureSets(job, ImmutableSet.of(featureSet1, featureSet2)); + + FeatureSetJobStatus expectedStatus2 = new FeatureSetJobStatus(); + expectedStatus2.setJob(job); + expectedStatus2.setFeatureSet(featureSet2); + expectedStatus2.setVersion(featureSet2.getVersion()); + expectedStatus2.setDeliveryStatus(STATUS_IN_PROGRESS); + + assertThat( + job.getFeatureSetJobStatuses(), containsInAnyOrder(expectedStatus1, expectedStatus2)); + } + + @Test + public void shouldCheckStatusOfAbortingJob() { + Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); + Store store = TestUtil.createStore("store", Collections.emptyList()); + + Job job = + Job.builder() + .setStatus(JobStatus.ABORTING) + .setFeatureSetJobStatuses(new HashSet<>()) + .setExtId("extId") + .build(); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) + .thenReturn(Optional.of(job)); + + List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + + assertThat("CheckStatus is expected", tasks.get(0) instanceof UpdateJobStatusTask); + } + + @Test + public void shouldUpgradeJobWhenNeeded() { + Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); + Store store = TestUtil.createStore("store", Collections.emptyList()); + + Job job = + Job.builder() + .setStatus(JobStatus.RUNNING) + .setFeatureSetJobStatuses(new HashSet<>()) + .setStores(new HashSet<>()) + .setExtId("extId") + .build(); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) + .thenReturn(Optional.of(job)); + + List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + + assertThat("UpgradeTask is expected", tasks.get(0) instanceof UpgradeJobTask); + } + + @Test + public void shouldCreateJobIfNoRunning() { + Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); + Store store = TestUtil.createStore("store", Collections.emptyList()); + + Job job = + Job.builder() + .setStatus(JobStatus.ERROR) + .setFeatureSetJobStatuses(new HashSet<>()) + .setExtId("") + .build(); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) + .thenReturn(Optional.of(job)); + + List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + + assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); + } + private ConsumerRecord newAckMessage( String key, int version, String jobName) { return new ConsumerRecord<>( diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index 02ebe611b3..967949e565 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.dao.JobRepository; @@ -122,7 +123,7 @@ public void setupSpecService() { public void setupJobRepository() { when(this.jobRepository.findById(this.job.getId())).thenReturn(Optional.of(this.job)); - when(this.jobRepository.findByStoreName(this.dataStore.getName())) + when(this.jobRepository.findByStoresName(this.dataStore.getName())) .thenReturn(Arrays.asList(this.job)); when(this.jobRepository.findByFeatureSetJobStatusesIn( Lists.newArrayList((this.featureSet.getJobStatuses())))) @@ -152,7 +153,7 @@ private Job newDummyJob(String id, String extId, JobStatus status) { .setExtId(extId) .setRunner(Runner.DATAFLOW) .setSource(this.dataSource) - .setStore(this.dataStore) + .setStores(ImmutableSet.of(this.dataStore)) .setFeatureSetJobStatuses(makeFeatureSetJobStatus(this.featureSet)) .setStatus(status) .build(); diff --git a/core/src/test/java/feast/core/util/TestUtil.java b/core/src/test/java/feast/core/util/TestUtil.java index 7a88ccdf77..5b3bf6b10c 100644 --- a/core/src/test/java/feast/core/util/TestUtil.java +++ b/core/src/test/java/feast/core/util/TestUtil.java @@ -64,8 +64,8 @@ public static Source createKafkaSource(String boostrapServers, String topic, boo .setType(SourceProto.SourceType.KAFKA) .setKafkaSourceConfig( SourceProto.KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .setTopic("my-topic") + .setBootstrapServers(boostrapServers) + .setTopic(topic) .build()) .build(), true); diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index b5ee81c562..2b3abf261f 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -45,6 +45,7 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.transforms.*; @@ -90,7 +91,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti log.info("Starting import job with settings: \n{}", options.toString()); - List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); + List stores = SpecUtil.parseStoreJsonList(options.getStoresJson()); Source source = SpecUtil.parseSourceJson(options.getSourceJson()); SpecsStreamingUpdateConfig specsStreamingUpdateConfig = SpecUtil.parseSpecsStreamingUpdateConfig(options.getSpecsStreamingUpdateConfigJson()); @@ -157,6 +158,7 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti WriteResult writeFeatureRows = storeAllocatedRows .get(storeTags.get(store)) + .setCoder(ProtoCoder.of(FeatureRow.class)) .apply("WriteFeatureRowToStore", featureSink.writer()); // Step 7. Write FailedElements to a dead letter table in BigQuery. diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index 9ee1ff891a..3a779b1e99 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -69,9 +69,9 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, + "https://developers.google.com/protocol-buffers/docs/proto3#json" + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + "to prevent error when parsing the options") - List getStoreJson(); + List getStoresJson(); - void setStoreJson(List storeJson); + void setStoresJson(List storeJson); @Description( "(Optional) Deadletter elements will be written to this BigQuery table." diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 7f0806bd66..1cfd29b541 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -194,7 +194,7 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() options.setSpecsStreamingUpdateConfigJson( JsonFormat.printer().print(specsStreamingUpdateConfig)); options.setSourceJson(JsonFormat.printer().print(featureSource)); - options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); + options.setStoresJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setDefaultFeastProject("myproject"); options.setProject(""); options.setBlockOnRun(false); diff --git a/protos/feast/core/IngestionJob.proto b/protos/feast/core/IngestionJob.proto index 0ce0f9681c..51309712fe 100644 --- a/protos/feast/core/IngestionJob.proto +++ b/protos/feast/core/IngestionJob.proto @@ -39,7 +39,7 @@ message IngestionJob { // Source this job is reading from. feast.core.Source source = 5; // Store this job is writing to. - feast.core.Store store = 6; + repeated feast.core.Store stores = 6; } // Status of a Feast Ingestion Job diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index c529d97ffd..720662acb6 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -539,7 +539,7 @@ def test_list_ingest_jobs(self, mocked_client, mocker): bootstrap_servers="localhost:9092", topic="topic" ), ), - store=Store(name="redis"), + stores=[Store(name="redis")], ) ] ),