Skip to content

Commit

Permalink
in memory job repository
Browse files Browse the repository at this point in the history
fixes after rebase

fix python tests
  • Loading branch information
pyalex committed Aug 2, 2020
1 parent ffd0647 commit 3532dcf
Show file tree
Hide file tree
Showing 45 changed files with 1,300 additions and 1,948 deletions.
1 change: 1 addition & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public static FeatureSetReference of(String projectName, String featureSetName,
return new FeatureSetReference(projectName, featureSetName, version);
}

public static FeatureSetReference of(String projectName, String featureSetName) {
return new FeatureSetReference(projectName, featureSetName, -1);
}

public String getReference() {
return String.format("%s/%s", getProjectName(), getFeatureSetName());
}
Expand Down
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -381,5 +382,17 @@
<version>3.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.6.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
14 changes: 12 additions & 2 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,18 @@ public static class JobProperties {
/* The active Apache Beam runner name. This name references one instance of the Runner class */
private String activeRunner;

/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;
/* Job Coordinator related properties */
private CoordinatorProperties coordinator;

@Getter
@Setter
public static class CoordinatorProperties {
/* If true only one IngestionJob would be created per source with all subscribed stores in it */
private Boolean consolidateJobsPerSource = false;

/* Labels to identify jobs managed by this job coordinator */
private Map<String, String> jobSelector = new HashMap<>();
}

/** List of configured job runners. */
private List<Runner> runners = new ArrayList<>();
Expand Down
26 changes: 20 additions & 6 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.JobRepository;
import feast.core.job.ConsolidatedJobStrategy;
import feast.core.job.JobGroupingStrategy;
import feast.core.job.JobManager;
import feast.core.job.JobPerStoreStrategy;
import feast.core.job.JobRepository;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.direct.DirectJobRegistry;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import feast.proto.core.SourceProto;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -80,7 +81,8 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo
@Bean
public JobGroupingStrategy getJobGroupingStrategy(
FeastProperties feastProperties, JobRepository jobRepository) {
Boolean shouldConsolidateJobs = feastProperties.getJobs().getConsolidateJobsPerSource();
Boolean shouldConsolidateJobs =
feastProperties.getJobs().getCoordinator().getConsolidateJobsPerSource();
if (shouldConsolidateJobs) {
return new ConsolidatedJobStrategy(jobRepository);
} else {
Expand All @@ -103,21 +105,33 @@ public JobManager getJobManager(
JobProperties jobProperties = feastProperties.getJobs();
FeastProperties.JobProperties.Runner runner = jobProperties.getActiveRunner();
Map<String, Object> runnerConfigOptions = runner.getOptions();
String configJson = gson.toJson(runnerConfigOptions);

FeastProperties.MetricsProperties metrics = jobProperties.getMetrics();

switch (runner.getType()) {
case DATAFLOW:
// Retrieve labels for options to extend them
Map<String, String> jobLabels =
(Map<String, String>) runnerConfigOptions.getOrDefault("labels", new HashMap());
// Merge Job Selector Labels into runner options
// to create jobs with the same set of labels
jobProperties.getCoordinator().getJobSelector().forEach(jobLabels::put);
runnerConfigOptions.put("labels", jobLabels);

String configJson = gson.toJson(runnerConfigOptions);

DataflowRunnerConfigOptions.Builder dataflowRunnerConfigOptions =
DataflowRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, dataflowRunnerConfigOptions);
return new DataflowJobManager(
dataflowRunnerConfigOptions.build(), metrics, specsStreamingUpdateConfig);
return DataflowJobManager.of(
dataflowRunnerConfigOptions.build(),
metrics,
specsStreamingUpdateConfig,
jobProperties.getCoordinator().getJobSelector());
case DIRECT:
DirectRunnerConfigOptions.Builder directRunnerConfigOptions =
DirectRunnerConfigOptions.newBuilder();
JsonFormat.parser().merge(configJson, directRunnerConfigOptions);
JsonFormat.parser().merge(gson.toJson(runnerConfigOptions), directRunnerConfigOptions);
return new DirectRunnerJobManager(
directRunnerConfigOptions.build(),
new DirectJobRegistry(),
Expand Down

This file was deleted.

46 changes: 26 additions & 20 deletions core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package feast.core.job;

import feast.core.dao.JobRepository;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -45,35 +43,43 @@ public ConsolidatedJobStrategy(JobRepository jobRepository) {
}

@Override
public Job getOrCreateJob(Source source, Set<Store> stores) {
public Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores) {
return jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())
.findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source, null, JobStatus.getTerminalStates())
.orElseGet(
() -> {
Job job =
Job.builder().setSource(source).setFeatureSetJobStatuses(new HashSet<>()).build();
job.setStores(stores);
return job;
});
() ->
Job.builder()
.setId(createJobId(source))
.setSource(source)
.setStores(
stores.stream()
.collect(Collectors.toMap(StoreProto.Store::getName, s -> s)))
.build());
}

@Override
public String createJobId(Job job) {
private String createJobId(SourceProto.Source source) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String jobId =
String.format(
"%s-%d-%s",
job.getSource().getTypeString(),
Objects.hashCode(job.getSource().getConfig()),
source.getType().getValueDescriptor().getName(),
Objects.hash(
source.getKafkaSourceConfig().getBootstrapServers(),
source.getKafkaSourceConfig().getTopic()),
dateSuffix);
return jobId.replaceAll("_store", "-").toLowerCase();
}

@Override
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream) {
Map<Source, Set<Store>> map =
public String createJobId(Job job) {
return createJobId(job.getSource());
}

@Override
public Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream) {
Map<SourceProto.Source, Set<StoreProto.Store>> map =
stream.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())));
Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/feast/core/job/InMemoryJobRepository.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job;

import com.google.common.collect.Lists;
import feast.common.models.FeatureSetReference;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.proto.core.SourceProto;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class InMemoryJobRepository implements JobRepository {
private final JobManager jobManager;

private Map<String, Job> storage;

@Autowired
public InMemoryJobRepository(JobManager jobManager) {
this.jobManager = jobManager;
this.storage = new HashMap<>();

this.storage =
this.jobManager.listJobs().stream().collect(Collectors.toMap(Job::getId, j -> j));
}

@Override
public Optional<Job> findFirstBySourceAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
SourceProto.Source source, String storeName, Collection<JobStatus> statuses) {
return this.storage.values().stream()
.filter(
j ->
j.getSource().equals(source)
&& (storeName == null || j.getStores().containsKey(storeName))
&& (!statuses.contains(j.getStatus())))
.max(Comparator.comparing(Job::getLastUpdated));
}

private List<Job> findWithFilter(Predicate<Job> p) {
return this.storage.values().stream().filter(p).collect(Collectors.toList());
}

@Override
public List<Job> findByStatus(JobStatus status) {
return this.findWithFilter(j -> j.getStatus().equals(status));
}

@Override
public List<Job> findByFeatureSetReference(FeatureSetReference reference) {
return this.findWithFilter(j -> j.getFeatureSetDeliveryStatuses().containsKey(reference));
}

@Override
public List<Job> findByJobStoreName(String storeName) {
return this.findWithFilter(j -> j.getStores().containsKey(storeName));
}

@Override
public Optional<Job> findById(String jobId) {
return Optional.ofNullable(this.storage.get(jobId));
}

@Override
public List<Job> findAll() {
return Lists.newArrayList(this.storage.values());
}

@Override
public void add(Job job) {
job.preSave();

this.storage.put(job.getId(), job);
}

@Override
public void deleteAll() {
this.storage.clear();
}
}
12 changes: 6 additions & 6 deletions core/src/main/java/feast/core/job/JobGroupingStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package feast.core.job;

import feast.core.model.Job;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.proto.core.SourceProto;
import feast.proto.core.StoreProto;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -29,10 +29,10 @@
*/
public interface JobGroupingStrategy {
/** Get the non terminated ingestion job ingesting for given source and stores. */
public Job getOrCreateJob(Source source, Set<Store> stores);
Job getOrCreateJob(SourceProto.Source source, Set<StoreProto.Store> stores);
/** Create unique JobId that would be used as key in communications with JobRunner */
public String createJobId(Job job);
String createJobId(Job job);
/* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/
public Iterable<Pair<Source, Set<Store>>> collectSingleJobInput(
Stream<Pair<Source, Store>> stream);
Iterable<Pair<SourceProto.Source, Set<StoreProto.Store>>> collectSingleJobInput(
Stream<Pair<SourceProto.Source, StoreProto.Store>> stream);
}
8 changes: 8 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.List;

public interface JobManager {

Expand Down Expand Up @@ -70,4 +71,11 @@ public interface JobManager {
* @return job status.
*/
JobStatus getJobStatus(Job job);

/**
* List of RUNNING jobs
*
* @return list of jobs
*/
List<Job> listJobs();
}
Loading

0 comments on commit 3532dcf

Please sign in to comment.