Skip to content

Commit

Permalink
Allow ingestion job grouping/consolidation to be configurable (#825)
Browse files Browse the repository at this point in the history
* job grouping strategy

* apidocs & tests

* fix some old docs

* adding apidoc in Spring Config
  • Loading branch information
Oleksii Moskalenko authored Jun 25, 2020
1 parent 89883d4 commit 26c8070
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 107 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static class JobProperties {
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();

Expand Down
24 changes: 24 additions & 0 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.JobRepository;
import feast.core.job.ConsolidatedJobStrategy;
import feast.core.job.JobGroupingStrategy;
import feast.core.job.JobManager;
import feast.core.job.JobPerStoreStrategy;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
Expand Down Expand Up @@ -64,6 +68,26 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo
.build();
}

/**
* Returns Grouping Strategy which is responsible for how Ingestion would be split across job
* instances (or how Sources and Stores would be grouped together). Choosing strategy depends on
* FeastProperties config "feast.jobs.consolidate-jobs-per-source".
*
* @param feastProperties feast config properties
* @param jobRepository repository required by strategy
* @return JobGroupingStrategy
*/
@Bean
public JobGroupingStrategy getJobGroupingStrategy(
FeastProperties feastProperties, JobRepository jobRepository) {
Boolean shouldConsolidateJobs = feastProperties.getJobs().getConsolidateJobsPerSource();
if (shouldConsolidateJobs) {
return new ConsolidatedJobStrategy(jobRepository);
} else {
return new JobPerStoreStrategy(jobRepository);
}
}

/**
* Get a JobManager according to the runner type and Dataflow configuration.
*
Expand Down
85 changes: 85 additions & 0 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

/**
* In this strategy one Ingestion Job per source is created. All stores that subscribed to
* FeatureSets from this source will be included as sinks in this consolidated Job.
*
* <p>JobId will contain only source parameters (type + config). StoreName will remain empty in Job
* table.
*/
public class ConsolidatedJobStrategy implements JobGroupingStrategy {
private final JobRepository jobRepository;

public ConsolidatedJobStrategy(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
}

@Override
public String createJobId(Job job) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
Map<Source, Set<Store>> map =
stream.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())));

return map.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue()))
.collect(Collectors.toList());
}
}
18 changes: 1 addition & 17 deletions core/src/main/java/feast/core/job/CreateJobTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,13 @@
import feast.core.log.Action;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import java.time.Instant;
import java.util.Objects;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Task that starts recently created {@link Job} by using {@link JobManager}. Since it's new job its
* Id being generated from attached {@link Source} and updated accordingly in-place.
*/
/** Task that starts recently created {@link Job} by using {@link JobManager}. */
@Getter
@Setter
@Builder(setterPrefix = "set")
Expand All @@ -43,12 +37,10 @@ public class CreateJobTask implements JobTask {

@Override
public Job call() {
String jobId = createJobId(job.getSource());
String runnerName = jobManager.getRunnerType().toString();

job.setRunner(jobManager.getRunnerType());
job.setStatus(JobStatus.PENDING);
job.setId(jobId);

try {
JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);
Expand All @@ -73,12 +65,4 @@ public Job call() {
return job;
}
}

String createJobId(Source source) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}
}
38 changes: 38 additions & 0 deletions core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import feast.core.model.Job;
import feast.core.model.Source;
import feast.core.model.Store;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

/**
* Strategy interface that defines how responsibility for sources and stores will be distributed
* across Ingestion Jobs.
*/
public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
public Job getOrCreateJob(Source source, Set<Store> stores);
/** Create unique JobId that would be used as key in communications with JobRunner */
public String createJobId(Job job);
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream);
}
85 changes: 85 additions & 0 deletions core/src/main/java/feast/core/job/JobPerStoreStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import com.google.common.collect.Lists;
import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;

/**
* In this strategy one job per Source-Store pair is created.
*
* <p>JobId is generated accordingly from Source (type+config) and StoreName.
*/
public class JobPerStoreStrategy implements JobGroupingStrategy {
private final JobRepository jobRepository;

public JobPerStoreStrategy(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
ArrayList<Store> storesList = Lists.newArrayList(stores);
if (storesList.size() != 1) {
throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy");
}
Store store = storesList.get(0);

return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates())
.orElseGet(
() ->
Job.builder()
.setSource(source)
.setStoreName(store.getName())
.setStores(stores)
.setFeatureSetJobStatuses(new HashSet<>())
.build());
}

@Override
public String createJobId(Job job) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-to-%s-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
job.getStoreName(),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
return stream.map(p -> Pair.of(p.getLeft(), Set.of(p.getRight()))).collect(Collectors.toList());
}
}
Loading

0 comments on commit 26c8070

Please sign in to comment.