Skip to content

Commit

Permalink
job grouping strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
pyalex committed Jun 24, 2020
1 parent 89883d4 commit 45b74c4
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 70 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static class JobProperties {
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.JobRepository;
import feast.core.job.ConsolidatedJobStrategy;
import feast.core.job.JobGroupingStrategy;
import feast.core.job.JobManager;
import feast.core.job.JobPerStoreStrategy;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
Expand Down Expand Up @@ -64,6 +68,16 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo
.build();
}

@Bean
public JobGroupingStrategy getJobGroupingStrategy(
FeastProperties feastProperties, JobRepository jobRepository) {
if (feastProperties.getJobs().getConsolidateJobsPerSource()) {
return new ConsolidatedJobStrategy(jobRepository);
} else {
return new JobPerStoreStrategy(jobRepository);
}
}

/**
* Get a JobManager according to the runner type and Dataflow configuration.
*
Expand Down
78 changes: 78 additions & 0 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

public class ConsolidatedJobStrategy implements JobGroupingStrategy {
private final JobRepository jobRepository;

public ConsolidatedJobStrategy(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
}

@Override
public String createJobId(Job job) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
Map<Source, Set<Store>> map =
stream.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())));

return map.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
}
}
12 changes: 0 additions & 12 deletions core/src/main/java/feast/core/job/CreateJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import java.time.Instant;
import java.util.Objects;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -43,12 +41,10 @@ public class CreateJobTask implements JobTask {

@Override
public Job call() {
String jobId = createJobId(job.getSource());
String runnerName = jobManager.getRunnerType().toString();

job.setRunner(jobManager.getRunnerType());
job.setStatus(JobStatus.PENDING);
job.setId(jobId);

try {
JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);
Expand All @@ -73,12 +69,4 @@ public Job call() {
return job;
}
}

String createJobId(Source source) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.model.Job;
import feast.core.model.Source;
import feast.core.model.Store;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
public Job getOrCreateJob(Source source, Set<Store> stores);

public String createJobId(Job job);

public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream);
}
80 changes: 80 additions & 0 deletions core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import com.google.common.collect.Lists;
import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

public class JobPerStoreStrategy implements JobGroupingStrategy {
private final JobRepository jobRepository;

public JobPerStoreStrategy(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
ArrayList<Store> storesList = Lists.newArrayList(stores);
if (storesList.size() != 1) {
throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy");
}
Store store = storesList.get(0);

return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
}

@Override
public String createJobId(Job job) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-to-%s-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
job.getStoreName(),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
return stream.map(p -> Pair.of(p.getLeft(), Set.of(p.getRight()))).collect(Collectors.toList());
}
}
Loading

0 comments on commit 45b74c4

Please sign in to comment.