Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use JobManager's backend as persistent storage and source of truth #903

Merged
merged 10 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Data
@AllArgsConstructor
public class FeatureSetReference implements Serializable {
public static String PROJECT_DEFAULT_NAME = "default";

/* Name of project to which this featureSet is assigned */
private String projectName;
/* Name of FeatureSet */
Expand All @@ -38,9 +40,14 @@ public class FeatureSetReference implements Serializable {
public FeatureSetReference() {}

public static FeatureSetReference of(String projectName, String featureSetName, Integer version) {
projectName = projectName.isEmpty() ? PROJECT_DEFAULT_NAME : projectName;
return new FeatureSetReference(projectName, featureSetName, version);
}

public static FeatureSetReference of(String projectName, String featureSetName) {
return FeatureSetReference.of(projectName, featureSetName, -1);
}

public String getReference() {
return String.format("%s/%s", getProjectName(), getFeatureSetName());
}
Expand Down
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -381,5 +382,17 @@
<version>3.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.6.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
14 changes: 12 additions & 2 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,18 @@ public static class JobProperties {
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;
/* Job Coordinator related properties */
private CoordinatorProperties coordinator;

@Getter
@Setter
public static class CoordinatorProperties {
/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;

/* Labels to identify jobs managed by this job coordinator */
private Map<String, String> jobSelector = new HashMap<>();
woop marked this conversation as resolved.
Show resolved Hide resolved
}

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.JobRepository;
import feast.core.job.ConsolidatedJobStrategy;
import feast.core.job.JobGroupingStrategy;
import feast.core.job.JobManager;
import feast.core.job.JobPerStoreStrategy;
import feast.core.job.JobRepository;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
Expand Down Expand Up @@ -80,7 +80,8 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo
@Bean
public JobGroupingStrategy getJobGroupingStrategy(
FeastProperties feastProperties, JobRepository jobRepository) {
Boolean shouldConsolidateJobs = feastProperties.getJobs().getConsolidateJobsPerSource();
Boolean shouldConsolidateJobs =
feastProperties.getJobs().getCoordinator().getConsolidateJobsPerSource();
if (shouldConsolidateJobs) {
return new ConsolidatedJobStrategy(jobRepository);
} else {
Expand All @@ -103,17 +104,20 @@ public JobManager getJobManager(
JobProperties jobProperties = feastProperties.getJobs();
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, Object> runnerConfigOptions = runner.getOptions();
String configJson = gson.toJson(runnerConfigOptions);

FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();
String configJson = gson.toJson(runnerConfigOptions);

switch (runner.getType()) {
case DATAFLOW:
DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
DataflowRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
return new DataflowJobManager(
dataflowRunnerConfigOptions.build(), metrics, specsStreamingUpdateConfig);
return DataflowJobManager.of(
dataflowRunnerConfigOptions.build(),
metrics,
specsStreamingUpdateConfig,
jobProperties.getCoordinator().getJobSelector());
case DIRECT:
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
DirectRunnerConfigOptions.newBuilder();
Expand Down

This file was deleted.

46 changes: 26 additions & 20 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package feast.core.job;

import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -45,35 +43,43 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source, null, JobStatus.getTerminalStates())
.orElseGet(
() -> {
Job job =
Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build();
job.setStores(stores);
return job;
});
() ->
Job.builder()
.setId(createJobId(source))
.setSource(source)
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.build());
}

@Override
public String createJobId(Job job) {
private String createJobId(SourceProto.Source source) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
source.getType().getValueDescriptor().getName(),
Objects.hash(
source.getKafkaSourceConfig().getBootstrapServers(),
source.getKafkaSourceConfig().getTopic()),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
Map<Source, Set<Store>> map =
public String createJobId(Job job) {
return createJobId(job.getSource());
}

@Override
public Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream) {
Map<SourceProto.Source, Set<StoreProto.Store>> map =
stream.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())));
Expand Down
122 changes: 122 additions & 0 deletions core/src/main/java/feast/core/job/InMemoryJobRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import com.google.common.collect.Lists;
import feast.common.models.FeatureSetReference;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.proto.core.SourceProto;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* Keeps state of all jobs managed by current application in memory. On start loads persistent state
* through JobManager from JobManager backend.
pyalex marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>E.g., with Dataflow runner all jobs with their state stored on Google's side and accessible
* via API. So we don't need to persist this state ourselves. Instead we just fetch it once and then
* we apply same changes to dataflow (via JobManager) and to InMemoryRepository to keep them in
* sync.
*
* <p>Provides flexible access to objects via JPA-like filtering API.
*/
@Component
public class InMemoryJobRepository implements JobRepository {
private final JobManager jobManager;

/** Internal storage for all jobs mapped by their Id */
private Map<String, Job> storage;
pyalex marked this conversation as resolved.
Show resolved Hide resolved

@Autowired
public InMemoryJobRepository(JobManager jobManager) {
this.jobManager = jobManager;
this.storage = new HashMap<>();

this.storage =
this.jobManager.listRunningJobs().stream().collect(Collectors.toMap(Job::getId, j -> j));
}

/**
* Returns single job that has given source, store with given name ans its status is not in given
* statuses. We expect this parameters to specify only one RUNNING job (most of the time). But in
* case there're many - we return latest updated one.
*
* @return job that matches given parameters if it's present
*/
@Override
public Optional<Job> findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
SourceProto.Source source, String storeName, Collection<JobStatus> statuses) {
return this.storage.values().stream()
.filter(
j ->
j.getSource().equals(source)
&& (storeName == null || j.getStores().containsKey(storeName))
&& (!statuses.contains(j.getStatus())))
.max(Comparator.comparing(Job::getLastUpdated));
}

private List<Job> findWithFilter(Predicate<Job> p) {
return this.storage.values().stream().filter(p).collect(Collectors.toList());
}

/** Find Jobs that have given status */
@Override
public List<Job> findByStatus(JobStatus status) {
return this.findWithFilter(j -> j.getStatus().equals(status));
}

/**
* Find Jobs that have given FeatureSet (specified by {@link FeatureSetReference} allocated to it.
*/
@Override
public List<Job> findByFeatureSetReference(FeatureSetReference reference) {
return this.findWithFilter(j -> j.getFeatureSetDeliveryStatuses().containsKey(reference));
}

/** Find Jobs that have one of the stores with given name */
@Override
public List<Job> findByJobStoreName(String storeName) {
return this.findWithFilter(j -> j.getStores().containsKey(storeName));
}

/** Find by Job's Id */
@Override
public Optional<Job> findById(String jobId) {
return Optional.ofNullable(this.storage.get(jobId));
}

@Override
public List<Job> findAll() {
return Lists.newArrayList(this.storage.values());
}

@Override
public void add(Job job) {
job.preSave();

this.storage.put(job.getId(), job);
}

@Override
public void deleteAll() {
this.storage.clear();
}
}
12 changes: 6 additions & 6 deletions core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package feast.core.job;

import feast.core.model.Job;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -29,10 +29,10 @@
*/
public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
public Job getOrCreateJob(Source source, Set<Store> stores);
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
/** Create unique JobId that would be used as key in communications with JobRunner */
public String createJobId(Job job);
String createJobId(Job job);
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream);
Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream);
}
8 changes: 8 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.List;

public interface JobManager {

Expand Down Expand Up @@ -70,4 +71,11 @@ public interface JobManager {
* @return job status.
*/
JobStatus getJobStatus(Job job);

/**
* List of RUNNING jobs
pyalex marked this conversation as resolved.
Show resolved Hide resolved
*
* @return list of jobs
*/
List<Job> listRunningJobs();
}
Loading