Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move communication with IngestionJob to JobCoordinator #800

Merged
merged 1 commit into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/config/FeastProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public static class FeatureSetSpecStreamProperties {

/* Kafka topic to receive acknowledgment from ingestion job on successful processing of new specs */
@NotBlank private String specsAckTopic = "feast-feature-set-specs-ack";

/* Notify jobs interval in millisecond.
How frequently Feast will check on Pending FeatureSets and publish them to kafka. */
@Positive private long notifyIntervalMilliseconds;
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.proto.core.FeatureSetProto;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;

Expand All @@ -37,4 +38,7 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>
// find all feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
String name, String project_name);

// find all feature sets matching given status
List<FeatureSet> findAllByStatus(FeatureSetProto.FeatureSetStatus status);
}
17 changes: 17 additions & 0 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import feast.proto.core.FeatureSetProto;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
Expand Down Expand Up @@ -149,10 +151,24 @@ private Job startJob(String jobId) {
}

private void updateFeatureSets(Job job) {
Map<FeatureSet, FeatureSetJobStatus> alreadyConnected =
job.getFeatureSetJobStatuses().stream()
.collect(Collectors.toMap(FeatureSetJobStatus::getFeatureSet, s -> s));

for (FeatureSet fs : featureSets) {
if (alreadyConnected.containsKey(fs)) {
continue;
}

FeatureSetJobStatus status = new FeatureSetJobStatus();
status.setFeatureSet(fs);
status.setJob(job);
if (fs.getStatus() == FeatureSetProto.FeatureSetStatus.STATUS_READY) {
// Feature Set was already delivered to previous generation of the job
// (another words, it exists in kafka)
// so we expect Job will ack latest version based on history from kafka topic
status.setVersion(fs.getVersion());
}
status.setDeliveryStatus(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
job.getFeatureSetJobStatuses().add(status);
}
Expand All @@ -175,6 +191,7 @@ private Job updateStatus(Job job) {
}

job.setStatus(newStatus);
updateFeatureSets(job);
return job;
}

Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/feast/core/model/FeatureSetJobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.model;

import com.google.common.base.Objects;
import feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus;
import java.io.Serializable;
import javax.persistence.*;
Expand Down Expand Up @@ -61,4 +62,23 @@ public FeatureSetJobStatusKey() {}
@Enumerated(EnumType.STRING)
@Column(name = "delivery_status")
private FeatureSetJobDeliveryStatus deliveryStatus;

@Column(name = "version", columnDefinition = "integer default 0")
private int version;

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FeatureSetJobStatus that = (FeatureSetJobStatus) o;
return version == that.version
&& Objects.equal(job.getId(), that.job.getId())
&& Objects.equal(featureSet.getReference(), that.featureSet.getReference())
&& deliveryStatus == that.deliveryStatus;
}

@Override
public int hashCode() {
return Objects.hashCode(job, featureSet, deliveryStatus, version);
}
}
140 changes: 135 additions & 5 deletions core/src/main/java/feast/core/service/JobCoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.core.service;

import static feast.core.model.FeatureSet.parseReference;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.config.FeastProperties;
import feast.core.config.FeastProperties.JobProperties;
Expand All @@ -26,21 +28,24 @@
import feast.core.model.*;
import feast.proto.core.CoreServiceProto.ListStoresRequest.Filter;
import feast.proto.core.CoreServiceProto.ListStoresResponse;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.Subscription;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import javax.validation.constraints.Positive;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -49,24 +54,29 @@
@Service
public class JobCoordinatorService {

private final int SPEC_PUBLISHING_TIMEOUT_SECONDS = 5;

private final JobRepository jobRepository;
private final FeatureSetRepository featureSetRepository;
private final SpecService specService;
private final JobManager jobManager;
private final JobProperties jobProperties;
private final KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher;

@Autowired
public JobCoordinatorService(
JobRepository jobRepository,
FeatureSetRepository featureSetRepository,
SpecService specService,
JobManager jobManager,
FeastProperties feastProperties) {
FeastProperties feastProperties,
KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
this.jobRepository = jobRepository;
this.featureSetRepository = featureSetRepository;
this.specService = specService;
this.jobManager = jobManager;
this.jobProperties = feastProperties.getJobs();
this.specPublisher = specPublisher;
}

/**
Expand Down Expand Up @@ -153,4 +163,124 @@ public Optional<Job> getJob(Source source, Store store) {
// return the latest
return Optional.of(jobs.get(0));
}

@Transactional
pyalex marked this conversation as resolved.
Show resolved Hide resolved
@Scheduled(fixedDelayString = "${feast.stream.specsOptions.notifyIntervalMilliseconds}")
public void notifyJobsWhenFeatureSetUpdated() {
List<FeatureSet> pendingFeatureSets =
featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING);

pendingFeatureSets.stream()
.filter(
fs -> {
List<FeatureSetJobStatus> runningJobs =
fs.getJobStatuses().stream()
.filter(jobStatus -> jobStatus.getJob().isRunning())
.collect(Collectors.toList());

return runningJobs.size() > 0
&& runningJobs.stream()
.allMatch(jobStatus -> jobStatus.getVersion() < fs.getVersion());
})
.forEach(
fs -> {
log.info("Sending new FeatureSet {} to Ingestion", fs.getReference());

// Sending latest version of FeatureSet to all currently running IngestionJobs
// (there's one topic for all sets).
// All related jobs would apply new FeatureSet on the fly.
// In case kafka doesn't respond within SPEC_PUBLISHING_TIMEOUT_SECONDS we will try
// again later.
try {
specPublisher
.sendDefault(fs.getReference(), fs.toProto().getSpec())
.get(SPEC_PUBLISHING_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(
"Error occurred while sending FeatureSetSpec to kafka. Cause {}."
+ " Will retry later",
e.getMessage());
return;
}

// Updating delivery status for related jobs (that are currently using this
// FeatureSet).
// We now set status to IN_PROGRESS, so listenAckFromJobs would be able to
// monitor delivery progress for each new version.
fs.getJobStatuses().stream()
.filter(s -> s.getJob().isRunning())
.forEach(
jobStatus -> {
jobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS);
jobStatus.setVersion(fs.getVersion());
});
featureSetRepository.saveAndFlush(fs);
});
}

/**
* Listener for ACK messages coming from IngestionJob when FeatureSetSpec is installed (in
* pipeline).
*
* <p>Updates FeatureSetJobStatus for respected FeatureSet (selected by reference) and Job (select
* by Id).
*
* <p>When all related (running) to FeatureSet jobs are updated - FeatureSet receives READY status
*
* @param record ConsumerRecord with key: FeatureSet reference and value: Ack message
*/
@KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"})
@Transactional
public void listenAckFromJobs(
ConsumerRecord<String, IngestionJobProto.FeatureSetSpecAck> record) {
String setReference = record.key();
Pair<String, String> projectAndSetName = parseReference(setReference);
FeatureSet featureSet =
featureSetRepository.findFeatureSetByNameAndProject_Name(
projectAndSetName.getSecond(), projectAndSetName.getFirst());
if (featureSet == null) {
log.warn(
String.format("ACKListener received message for unknown FeatureSet %s", setReference));
return;
}

int ackVersion = record.value().getFeatureSetVersion();

if (featureSet.getVersion() != ackVersion) {
log.warn(
String.format(
"ACKListener received outdated ack for %s. Current %d, Received %d",
setReference, featureSet.getVersion(), ackVersion));
return;
}

log.info("Updating featureSet {} delivery statuses.", featureSet.getReference());

featureSet.getJobStatuses().stream()
.filter(
js ->
js.getJob().getId().equals(record.value().getJobName())
&& js.getVersion() == ackVersion)
.findFirst()
.ifPresent(
featureSetJobStatus ->
featureSetJobStatus.setDeliveryStatus(
FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

boolean allDelivered =
featureSet.getJobStatuses().stream()
.filter(js -> js.getJob().isRunning())
.allMatch(
js ->
js.getDeliveryStatus()
.equals(FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED));

if (allDelivered) {
log.info("FeatureSet {} update is completely delivered", featureSet.getReference());

featureSet.setStatus(FeatureSetProto.FeatureSetStatus.STATUS_READY);
featureSetRepository.saveAndFlush(featureSet);
}
}
}
Loading