diff --git a/core/src/main/java/feast/core/model/Store.java b/core/src/main/java/feast/core/model/Store.java index 9288217e74..bd16a5cf3e 100644 --- a/core/src/main/java/feast/core/model/Store.java +++ b/core/src/main/java/feast/core/model/Store.java @@ -47,7 +47,7 @@ @AllArgsConstructor @Entity @Table(name = "stores") -public class Store { +public class Store extends AbstractTimestampEntity { // Name of the store. Must be unique @Id diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 08b8b90411..feb2958944 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -223,6 +223,10 @@ private boolean jobRequiresUpgrade(Job job, Set stores) { return true; } + if (stores.stream().anyMatch(s -> s.getLastUpdated().after(job.getCreated()))) { + return true; + } + return false; } diff --git a/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql b/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql new file mode 100644 index 0000000000..b69cd9c633 --- /dev/null +++ b/core/src/main/resources/db/migration/V2.4__Store_Timestamps.sql @@ -0,0 +1,2 @@ +ALTER TABLE stores ADD COLUMN created timestamp default now(); +ALTER TABLE stores ADD COLUMN last_updated timestamp default now(); \ No newline at end of file diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 2a4fc24693..6db9f94317 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -53,6 +53,7 @@ import feast.proto.core.SourceProto.SourceType; import feast.proto.core.StoreProto; import feast.proto.core.StoreProto.Store.Subscription; +import java.time.Instant; import java.util.*; import java.util.concurrent.CancellationException; import lombok.SneakyThrows; @@ -740,6 +741,35 @@ public void shouldUpgradeJobWhenNeeded() { assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); } + @Test + public void shouldUpgradeJobWhenStoreChanged() { + Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); + Store store = TestUtil.createStore("store", Collections.emptyList()); + + Job job = + Job.builder() + .setStatus(JobStatus.RUNNING) + .setFeatureSetJobStatuses(new HashSet<>()) + .setSource(source) + .setStores(ImmutableSet.of(store)) + .setExtId("extId") + .build(); + + job.setCreated(Date.from(Instant.now())); + store.setLastUpdated(Date.from(Instant.now().plusSeconds(1))); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) + .thenReturn(Optional.of(job)); + + List tasks = + jcsWithConsolidation.makeJobUpdateTasks( + ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); + + assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); + } + @Test public void shouldCreateJobIfNoRunning() { Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false);