Skip to content

Commit

Permalink
check store lastUpdate (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksii Moskalenko authored Jun 30, 2020
1 parent 8331482 commit 3945180
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/model/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@AllArgsConstructor
@Entity
@Table(name = "stores")
public class Store {
public class Store extends AbstractTimestampEntity {

// Name of the store. Must be unique
@Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ private boolean jobRequiresUpgrade(Job job, Set<Store> stores) {
return true;
}

if (stores.stream().anyMatch(s -> s.getLastUpdated().after(job.getCreated()))) {
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE stores ADD COLUMN created timestamp default now();
ALTER TABLE stores ADD COLUMN last_updated timestamp default now();
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import feast.proto.core.SourceProto.SourceType;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.Subscription;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CancellationException;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -749,6 +750,35 @@ public void shouldUpgradeJobWhenNeeded() {
assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask);
}

@Test
public void shouldUpgradeJobWhenStoreChanged() {
Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false);
Store store = TestUtil.createStore("store", Collections.emptyList());

Job job =
Job.builder()
.setStatus(JobStatus.RUNNING)
.setFeatureSetJobStatuses(new HashSet<>())
.setSource(source)
.setStores(ImmutableSet.of(store))
.setExtId("extId")
.build();

job.setCreated(Date.from(Instant.now()));
store.setLastUpdated(Date.from(Instant.now().plusSeconds(1)));

when(jobRepository
.findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()))
.thenReturn(Optional.of(job));

List<JobTask> tasks =
jcsWithConsolidation.makeJobUpdateTasks(
ImmutableList.of(Pair.of(source, ImmutableSet.of(store))));

assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask);
}

@Test
public void shouldCreateJobIfNoRunning() {
Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false);
Expand Down

0 comments on commit 3945180

Please sign in to comment.