Skip to content

Commit

Permalink
one job per source (#817)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksii Moskalenko authored Jun 24, 2020
1 parent 74bcd3f commit 89883d4
Show file tree
Hide file tree
Showing 26 changed files with 843 additions and 692 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);
List<Job> findByStoresName(String storeName);
}
84 changes: 84 additions & 0 deletions core/src/main/java/feast/core/job/CreateJobTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.log.Action;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import java.time.Instant;
import java.util.Objects;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Task that starts recently created {@link Job} by using {@link JobManager}. Since it's new job its
* Id being generated from attached {@link Source} and updated accordingly in-place.
*/
@Getter
@Setter
@Builder(setterPrefix = "set")
public class CreateJobTask implements JobTask {
final Logger log = LoggerFactory.getLogger(CreateJobTask.class);

private Job job;
private JobManager jobManager;

@Override
public Job call() {
String jobId = createJobId(job.getSource());
String runnerName = jobManager.getRunnerType().toString();

job.setRunner(jobManager.getRunnerType());
job.setStatus(JobStatus.PENDING);
job.setId(jobId);

try {
JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

job = jobManager.startJob(job);
var extId = job.getExtId();
if (extId.isEmpty()) {
throw new RuntimeException(
String.format("Could not submit job: \n%s", "unable to retrieve job external id"));
}

var auditMessage = "Job submitted to runner %s with ext id %s.";
JobTask.logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName, extId);

return job;
} catch (Exception e) {
log.error(e.getMessage());
var auditMessage = "Job failed to be submitted to runner %s. Job status changed to ERROR.";
JobTask.logAudit(Action.STATUS_CHANGE, job, auditMessage, runnerName);

job.setStatus(JobStatus.ERROR);
return job;
}
}

String createJobId(Source source) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}
}
32 changes: 32 additions & 0 deletions core/src/main/java/feast/core/job/JobTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
import feast.core.model.Job;
import java.util.concurrent.Callable;

public interface JobTask extends Callable<Job> {
static void logAudit(Action action, Job job, String detail, Object... args) {
AuditLogger.log(Resource.JOB, job.getId(), action, detail, args);
}

@Override
Job call() throws RuntimeException;
}
247 changes: 0 additions & 247 deletions core/src/main/java/feast/core/job/JobUpdateTask.java

This file was deleted.

Loading

0 comments on commit 89883d4

Please sign in to comment.