Skip to content

Commit

Permalink
Deallocate featureSet from job when source changed (#844)
Browse files Browse the repository at this point in the history
* allocate featureSet to Jobs

* lint
  • Loading branch information
Oleksii Moskalenko authored Jun 30, 2020
1 parent 9f35706 commit d6c2a65
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.dao;

import feast.core.model.FeatureSetJobStatus;
import org.springframework.data.jpa.repository.JpaRepository;

public interface FeatureSetJobStatusRepository
extends JpaRepository<FeatureSetJobStatus, FeatureSetJobStatus.FeatureSetJobStatusKey> {
long count();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,6 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hashCode(job, featureSet, deliveryStatus, version);
return Objects.hashCode(job, featureSet);
}
}
41 changes: 30 additions & 11 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ public boolean isDeployed() {
return getExtId() != null && !getExtId().isEmpty();
}

public List<FeatureSet> getFeatureSets() {
public Set<FeatureSet> getFeatureSets() {
return this.featureSetJobStatuses.stream()
.map(FeatureSetJobStatus::getFeatureSet)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}

public Source getSource() {
Expand All @@ -128,6 +128,22 @@ public Source getSource() {
return source;
}

public void addAllFeatureSets(Set<FeatureSet> featureSets) {
for (FeatureSet fs : featureSets) {
FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setJob(this);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
this.getFeatureSetJobStatuses().add(status);
}
}

/**
* Convert a job model to ingestion job proto
*
Expand Down Expand Up @@ -159,15 +175,18 @@ public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferExce
}

public Job clone() {
return Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
Job job =
Job.builder()
.setStores(getStores())
.setStoreName(getStoreName())
.setSourceConfig(getSourceConfig())
.setSourceType(getSourceType())
.setFeatureSetJobStatuses(new HashSet<>())
.setRunner(getRunner())
.setStatus(JobStatus.UNKNOWN)
.build();
job.addAllFeatureSets(getFeatureSets());
return job;
}

@Override
Expand Down
85 changes: 51 additions & 34 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package feast.core.service;

import static feast.common.models.Store.isSubscribedToFeatureSet;
import static feast.core.model.FeatureSet.parseReference;

import com.google.common.collect.Sets;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
import feast.core.dao.FeatureSetJobStatusRepository;
import feast.core.dao.FeatureSetRepository;
import feast.core.dao.JobRepository;
import feast.core.job.*;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class JobCoordinatorService {

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final FeatureSetJobStatusRepository jobStatusRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
Expand All @@ -68,13 +71,15 @@ public class JobCoordinatorService {
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
FeatureSetJobStatusRepository jobStatusRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties,
JobGroupingStrategy groupingStrategy,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.jobStatusRepository = jobStatusRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
Expand Down Expand Up @@ -152,10 +157,6 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
for (Pair<Source, Set<Store>> mapping : sourceToStores) {
Source source = mapping.getKey();
Set<Store> stores = mapping.getValue();
Set<FeatureSet> featureSets =
stores.stream()
.flatMap(s -> getFeatureSetsForStore(s).stream())
.collect(Collectors.toSet());

Job job = groupingStrategy.getOrCreateJob(source, stores);

Expand Down Expand Up @@ -185,12 +186,15 @@ List<JobTask> makeJobUpdateTasks(Iterable<Pair<Source, Set<Store>>> sourceToStor
}
} else {
job.setId(groupingStrategy.createJobId(job));
job.addAllFeatureSets(
stores.stream()
.flatMap(s -> getFeatureSetsForStore(s).stream())
.filter(fs -> fs.getSource().equals(source))
.collect(Collectors.toSet()));

jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build());
}

allocateFeatureSets(job, featureSets);

// Record the job as required to safeguard it from getting stopped
activeJobs.add(job);
}
Expand Down Expand Up @@ -227,38 +231,50 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
}

/**
* Connects given {@link Job} with FeatureSets by creating {@link FeatureSetJobStatus}. This
* connection represents responsibility of the job to handle allocated FeatureSets. We use this
* Connects given {@link FeatureSet} with Jobs by creating {@link FeatureSetJobStatus}. This
* connection represents responsibility of the job to handle allocated FeatureSet. We use this
* connection {@link FeatureSetJobStatus} to monitor Ingestion of specific FeatureSet and Specs
* delivery status.
*
* <p>Only after this connection is created FeatureSetSpec could be sent to IngestionJob.
*
* @param job {@link Job} responsible job
* @param featureSets Set of {@link FeatureSet} featureSets to allocate to this job
* @param featureSet featureSet {@link FeatureSet} to find jobs and allocate
*/
void allocateFeatureSets(Job job, Set<FeatureSet> featureSets) {
Map<FeatureSet, FeatureSetJobStatus> alreadyConnected =
job.getFeatureSetJobStatuses().stream()
.collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s));

for (FeatureSet fs : featureSets) {
if (alreadyConnected.containsKey(fs)) {
FeatureSet allocateFeatureSetToJobs(FeatureSet featureSet) {
Set<FeatureSetJobStatus> toAdd = new HashSet<>();
Set<FeatureSetJobStatus> existing = featureSet.getJobStatuses();

Stream<Pair<Source, Store>> jobArgsStream =
getAllStores().stream()
.filter(
s ->
isSubscribedToFeatureSet(
s.getSubscriptions(),
featureSet.getProject().getName(),
featureSet.getName()))
.map(s -> Pair.of(featureSet.getSource(), s));

for (Pair<Source, Set<Store>> jobArgs : groupingStrategy.collectSingleJobInput(jobArgsStream)) {
Job job = groupingStrategy.getOrCreateJob(jobArgs.getLeft(), jobArgs.getRight());
if (!job.isRunning()) {
continue;
}

FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setFeatureSet(featureSet);
status.setJob(job);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
job.getFeatureSetJobStatuses().add(status);

toAdd.add(status);
}

Set<FeatureSetJobStatus> toDelete = Sets.difference(existing, toAdd);
toAdd = Sets.difference(toAdd, existing);

jobStatusRepository.deleteAll(toDelete);
jobStatusRepository.saveAll(toAdd);
jobStatusRepository.flush();
return featureSet;
}

/** Get running extra ingestion jobs that have ids not in keepJobs */
Expand All @@ -271,24 +287,24 @@ private Collection<Job> getExtraJobs(List<Job> keepJobs) {
return extraJobMap.values();
}

private List<Store> getAllStores() {
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
return listStoresResponse.getStoreList().stream()
.map(Store::fromProto)
.collect(Collectors.toList());
}

/**
* Generate a source to stores mapping. The resulting iterable yields pairs of Source and
* Set-of-stores to create one ingestion job per each pair.
*
* @return a Map from source to stores.
*/
private Iterable<Pair<Source, Set<Store>>> getSourceToStoreMappings() {
ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build());
List<Store> stores =
listStoresResponse.getStoreList().stream()
.map(Store::fromProto)
.collect(Collectors.toList());

// build mapping from source to store.
// compile a set of sources via subscribed FeatureSets of stores.

Stream<Pair<Source, Store>> distinctPairs =
stores.stream()
getAllStores().stream()
.flatMap(
store ->
getFeatureSetsForStore(store).stream()
Expand Down Expand Up @@ -325,6 +341,7 @@ public void notifyJobsWhenFeatureSetUpdated() {
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);

pendingFeatureSets.stream()
.map(this::allocateFeatureSetToJobs)
.filter(
fs -> {
List<FeatureSetJobStatus> runningJobs =
Expand All @@ -334,7 +351,7 @@ public void notifyJobsWhenFeatureSetUpdated() {

return runningJobs.size() > 0
&& runningJobs.stream()
.allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
.anyMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
})
.forEach(
fs -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE jobs_feature_sets ADD CONSTRAINT jobs_feature_sets_pkey PRIMARY KEY (job_id, feature_sets_id);

ALTER TABLE jobs_stores ADD CONSTRAINT jobs_stores_pkey PRIMARY KEY (job_id, store_name);
Loading

0 comments on commit d6c2a65

Please sign in to comment.