From 26c8070c46cb1a9222d2e808e594ca276609e237 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 25 Jun 2020 09:10:59 +0300 Subject: [PATCH] Allow ingestion job grouping/consolidation to be configurable (#825) * job grouping strategy * apidocs & tests * fix some old docs * adding apidoc in Spring Config --- .../feast/core/config/FeastProperties.java | 3 + .../java/feast/core/config/JobConfig.java | 24 +++ .../core/job/ConsolidatedJobStrategy.java | 85 +++++++++ .../java/feast/core/job/CreateJobTask.java | 18 +- .../feast/core/job/JobGroupingStrategy.java | 38 ++++ .../feast/core/job/JobPerStoreStrategy.java | 85 +++++++++ .../core/service/JobCoordinatorService.java | 63 +++---- core/src/main/resources/application.yml | 4 + .../java/feast/core/job/JobTasksTest.java | 10 +- .../service/JobCoordinatorServiceTest.java | 162 +++++++++++++----- .../ingestion/transform/ReadFromSource.java | 3 +- 11 files changed, 388 insertions(+), 107 deletions(-) create mode 100644 core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java create mode 100644 core/src/main/java/feast/core/job/JobGroupingStrategy.java create mode 100644 core/src/main/java/feast/core/job/JobPerStoreStrategy.java diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index 62d0d3bea5..753630732a 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -81,6 +81,9 @@ public static class JobProperties { /* The active Apache Beam runner name. This name references one instance of the Runner class */ private String activeRunner; + /* If true only one IngestionJob would be created per source with all subscribed stores in it */ + private Boolean consolidateJobsPerSource = false; + /** List of configured job runners. */ private List runners = new ArrayList<>(); diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index fd0dc6ef5c..be240d20e9 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -20,7 +20,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import feast.core.config.FeastProperties.JobProperties; +import feast.core.dao.JobRepository; +import feast.core.job.ConsolidatedJobStrategy; +import feast.core.job.JobGroupingStrategy; import feast.core.job.JobManager; +import feast.core.job.JobPerStoreStrategy; import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.direct.DirectJobRegistry; import feast.core.job.direct.DirectRunnerJobManager; @@ -64,6 +68,26 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo .build(); } + /** + * Returns Grouping Strategy which is responsible for how Ingestion would be split across job + * instances (or how Sources and Stores would be grouped together). Choosing strategy depends on + * FeastProperties config "feast.jobs.consolidate-jobs-per-source". + * + * @param feastProperties feast config properties + * @param jobRepository repository required by strategy + * @return JobGroupingStrategy + */ + @Bean + public JobGroupingStrategy getJobGroupingStrategy( + FeastProperties feastProperties, JobRepository jobRepository) { + Boolean shouldConsolidateJobs = feastProperties.getJobs().getConsolidateJobsPerSource(); + if (shouldConsolidateJobs) { + return new ConsolidatedJobStrategy(jobRepository); + } else { + return new JobPerStoreStrategy(jobRepository); + } + } + /** * Get a JobManager according to the runner type and Dataflow configuration. * diff --git a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java new file mode 100644 index 0000000000..e9eb59ed61 --- /dev/null +++ b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.dao.JobRepository; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import feast.core.model.Store; +import java.time.Instant; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +/** + * In this strategy one Ingestion Job per source is created. All stores that subscribed to + * FeatureSets from this source will be included as sinks in this consolidated Job. + * + *

JobId will contain only source parameters (type + config). StoreName will remain empty in Job + * table. + */ +public class ConsolidatedJobStrategy implements JobGroupingStrategy { + private final JobRepository jobRepository; + + public ConsolidatedJobStrategy(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + @Override + public Job getOrCreateJob(Source source, Set stores) { + return jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) + .orElseGet( + () -> + Job.builder() + .setSource(source) + .setStores(stores) + .setFeatureSetJobStatuses(new HashSet<>()) + .build()); + } + + @Override + public String createJobId(Job job) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + String jobId = + String.format( + "%s-%d-%s", + job.getSource().getTypeString(), + Objects.hashCode(job.getSource().getConfig()), + dateSuffix); + return jobId.replaceAll("_store", "-").toLowerCase(); + } + + @Override + public Iterable>> collectSingleJobInput( + Stream> stream) { + Map> map = + stream.collect( + Collectors.groupingBy( + Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet()))); + + return map.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + } +} diff --git a/core/src/main/java/feast/core/job/CreateJobTask.java b/core/src/main/java/feast/core/job/CreateJobTask.java index 867dcfc02a..a0d8d3a1d5 100644 --- a/core/src/main/java/feast/core/job/CreateJobTask.java +++ b/core/src/main/java/feast/core/job/CreateJobTask.java @@ -19,19 +19,13 @@ import feast.core.log.Action; import feast.core.model.Job; import feast.core.model.JobStatus; -import feast.core.model.Source; -import java.time.Instant; -import java.util.Objects; import lombok.Builder; import lombok.Getter; import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Task that starts recently created {@link Job} by using {@link JobManager}. Since it's new job its - * Id being generated from attached {@link Source} and updated accordingly in-place. - */ +/** Task that starts recently created {@link Job} by using {@link JobManager}. */ @Getter @Setter @Builder(setterPrefix = "set") @@ -43,12 +37,10 @@ public class CreateJobTask implements JobTask { @Override public Job call() { - String jobId = createJobId(job.getSource()); String runnerName = jobManager.getRunnerType().toString(); job.setRunner(jobManager.getRunnerType()); job.setStatus(JobStatus.PENDING); - job.setId(jobId); try { JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); @@ -73,12 +65,4 @@ public Job call() { return job; } } - - String createJobId(Source source) { - String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); - String jobId = - String.format( - "%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix); - return jobId.replaceAll("_store", "-").toLowerCase(); - } } diff --git a/core/src/main/java/feast/core/job/JobGroupingStrategy.java b/core/src/main/java/feast/core/job/JobGroupingStrategy.java new file mode 100644 index 0000000000..826f415e22 --- /dev/null +++ b/core/src/main/java/feast/core/job/JobGroupingStrategy.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.model.Job; +import feast.core.model.Source; +import feast.core.model.Store; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Strategy interface that defines how responsibility for sources and stores will be distributed + * across Ingestion Jobs. + */ +public interface JobGroupingStrategy { + /** Get the non terminated ingestion job ingesting for given source and stores. */ + public Job getOrCreateJob(Source source, Set stores); + /** Create unique JobId that would be used as key in communications with JobRunner */ + public String createJobId(Job job); + /* Distribute given sources and stores across jobs. One yielded Pair - one created Job **/ + public Iterable>> collectSingleJobInput( + Stream> stream); +} diff --git a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java new file mode 100644 index 0000000000..a5d76b6358 --- /dev/null +++ b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import com.google.common.collect.Lists; +import feast.core.dao.JobRepository; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import feast.core.model.Store; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +/** + * In this strategy one job per Source-Store pair is created. + * + *

JobId is generated accordingly from Source (type+config) and StoreName. + */ +public class JobPerStoreStrategy implements JobGroupingStrategy { + private final JobRepository jobRepository; + + public JobPerStoreStrategy(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + @Override + public Job getOrCreateJob(Source source, Set stores) { + ArrayList storesList = Lists.newArrayList(stores); + if (storesList.size() != 1) { + throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy"); + } + Store store = storesList.get(0); + + return jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates()) + .orElseGet( + () -> + Job.builder() + .setSource(source) + .setStoreName(store.getName()) + .setStores(stores) + .setFeatureSetJobStatuses(new HashSet<>()) + .build()); + } + + @Override + public String createJobId(Job job) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + String jobId = + String.format( + "%s-%d-to-%s-%s", + job.getSource().getTypeString(), + Objects.hashCode(job.getSource().getConfig()), + job.getStoreName(), + dateSuffix); + return jobId.replaceAll("_store", "-").toLowerCase(); + } + + @Override + public Iterable>> collectSingleJobInput( + Stream> stream) { + return stream.map(p -> Pair.of(p.getLeft(), Set.of(p.getRight()))).collect(Collectors.toList()); + } +} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 7ed3f61807..8451dc9e51 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -19,12 +19,10 @@ import static feast.core.model.FeatureSet.parseReference; import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; -import feast.core.dao.SourceRepository; import feast.core.job.*; import feast.core.model.*; import feast.core.model.FeatureSet; @@ -39,6 +37,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -57,28 +56,28 @@ public class JobCoordinatorService { private final JobRepository jobRepository; private final FeatureSetRepository featureSetRepository; - private final SourceRepository sourceRepository; private final SpecService specService; private final JobManager jobManager; private final JobProperties jobProperties; + private final JobGroupingStrategy groupingStrategy; private final KafkaTemplate specPublisher; @Autowired public JobCoordinatorService( JobRepository jobRepository, FeatureSetRepository featureSetRepository, - SourceRepository sourceRepository, SpecService specService, JobManager jobManager, FeastProperties feastProperties, + JobGroupingStrategy groupingStrategy, KafkaTemplate specPublisher) { this.jobRepository = jobRepository; this.featureSetRepository = featureSetRepository; - this.sourceRepository = sourceRepository; this.specService = specService; this.jobManager = jobManager; this.jobProperties = feastProperties.getJobs(); this.specPublisher = specPublisher; + this.groupingStrategy = groupingStrategy; } /** @@ -94,9 +93,9 @@ public JobCoordinatorService( */ @Transactional @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") - public void Poll() throws InvalidProtocolBufferException { + public void Poll() { log.info("Polling for new jobs..."); - Map> sourceStoreMappings = getSourceToStoreMappings(); + Iterable>> sourceStoreMappings = getSourceToStoreMappings(); List jobUpdateTasks = makeJobUpdateTasks(sourceStoreMappings); if (jobUpdateTasks.isEmpty()) { @@ -137,18 +136,18 @@ void startOrUpdateJobs(List tasks) { * stop ingestion jobs its the required ingestions to maintained ingestion jobs are already * RUNNING. * - * @param sourceStoresMappings a list of source to store pairs where ingestion jobs would have to + * @param sourceToStores a iterable of source to stores pairs where ingestion jobs would have to * be maintained for ingestion to work correctly. * @return list of job update tasks required to reconcile the current ingestion jobs to the state * that is defined by sourceStoreMap. */ - List makeJobUpdateTasks(Map> sourceStoresMappings) { + List makeJobUpdateTasks(Iterable>> sourceToStores) { List jobTasks = new LinkedList<>(); // Ensure a running job for each source to store mapping List activeJobs = new LinkedList<>(); boolean isSafeToStopJobs = true; - for (Map.Entry> mapping : sourceStoresMappings.entrySet()) { + for (Pair> mapping : sourceToStores) { Source source = mapping.getKey(); Set stores = mapping.getValue(); Set featureSets = @@ -156,7 +155,7 @@ List makeJobUpdateTasks(Map> sourceStoresMappings) { .flatMap(s -> getFeatureSetsForStore(s).stream()) .collect(Collectors.toSet()); - Job job = getOrCreateJob(source, stores); + Job job = groupingStrategy.getOrCreateJob(source, stores); if (job.isDeployed()) { if (!job.isRunning()) { @@ -175,6 +174,8 @@ List makeJobUpdateTasks(Map> sourceStoresMappings) { jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build()); } } else { + job.setId(groupingStrategy.createJobId(job)); + jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build()); } @@ -250,21 +251,6 @@ void allocateFeatureSets(Job job, Set featureSets) { } } - /** Get the non terminated ingestion job ingesting for given source. */ - public Job getOrCreateJob(Source source, Set stores) { - return jobRepository - .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) - .orElseGet( - () -> - Job.builder() - .setRunner(jobManager.getRunnerType()) - .setSource(source) - .setStores(stores) - .setFeatureSetJobStatuses(new HashSet<>()) - .build()); - } - /** Get running extra ingestion jobs that have ids not in keepJobs */ @Transactional private Collection getExtraJobs(List keepJobs) { @@ -276,12 +262,12 @@ private Collection getExtraJobs(List keepJobs) { } /** - * Generate a source to stores mapping. The mapping maps the source to Set-of-stores in which - * ingestion jobs would have to be maintained for ingestion to work correctly. + * Generate a source to stores mapping. The resulting iterable yields pairs of Source and + * Set-of-stores to create one ingestion job per each pair. * * @return a Map from source to stores. */ - private Map> getSourceToStoreMappings() { + private Iterable>> getSourceToStoreMappings() { ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); List stores = listStoresResponse.getStoreList().stream() @@ -291,16 +277,15 @@ private Map> getSourceToStoreMappings() { // build mapping from source to store. // compile a set of sources via subscribed FeatureSets of stores. - return stores.stream() - .flatMap( - store -> - getFeatureSetsForStore(store).stream() - .map(FeatureSet::getSource) - .map(source -> Pair.of(source, store))) - .distinct() - .collect( - Collectors.groupingBy( - Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet()))); + Stream> distinctPairs = + stores.stream() + .flatMap( + store -> + getFeatureSetsForStore(store).stream() + .map(FeatureSet::getSource) + .map(source -> Pair.of(source, store))) + .distinct(); + return groupingStrategy.collectSingleJobInput(distinctPairs); } /** diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 57728e0dc8..b82e5db0b4 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -60,6 +60,10 @@ feast: # Port of the metrics sink. port: 9125 + # if true one job per source with many stores would be created + # if false one job per source-store pair would be created + consolidate-jobs-per-source: false + stream: # Feature stream type. Only kafka is supported. type: kafka diff --git a/core/src/test/java/feast/core/job/JobTasksTest.java b/core/src/test/java/feast/core/job/JobTasksTest.java index 950bd23d1e..e0c0cd182c 100644 --- a/core/src/test/java/feast/core/job/JobTasksTest.java +++ b/core/src/test/java/feast/core/job/JobTasksTest.java @@ -18,8 +18,6 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -107,10 +105,9 @@ TerminateJobTask makeTerminateTask(Job currentJob) { @Test public void shouldCreateJobIfNotPresent() { - Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); + Job expectedInput = makeJob("ext", Collections.emptyList(), JobStatus.PENDING); - CreateJobTask task = spy(makeCreateTask(expectedInput)); - doReturn("job").when(task).createJobId(source); + CreateJobTask task = makeCreateTask(expectedInput); Job expected = makeJob("ext", Collections.emptyList(), JobStatus.RUNNING); @@ -135,8 +132,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); - CreateJobTask jobUpdateTask = spy(makeCreateTask(expectedInput)); - doReturn("job").when(jobUpdateTask).createJobId(source); + CreateJobTask jobUpdateTask = makeCreateTask(expectedInput); Job expected = makeJob("", Collections.emptyList(), JobStatus.ERROR); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index c19641149b..83d3482a9a 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -18,16 +18,18 @@ import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_DELIVERED; import static feast.proto.core.FeatureSetProto.FeatureSetJobDeliveryStatus.STATUS_IN_PROGRESS; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; +import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -54,6 +56,7 @@ import java.util.*; import java.util.concurrent.CancellationException; import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; @@ -75,7 +78,8 @@ public class JobCoordinatorServiceTest { @Mock SourceRepository sourceRepository; private FeastProperties feastProperties; - private JobCoordinatorService jcs; + private JobCoordinatorService jcsWithConsolidation; + private JobCoordinatorService jcsWithJobPerStore; @Before public void setUp() { @@ -85,14 +89,24 @@ public void setUp() { jobProperties.setJobUpdateTimeoutSeconds(5); feastProperties.setJobs(jobProperties); - jcs = + jcsWithConsolidation = new JobCoordinatorService( jobRepository, featureSetRepository, - sourceRepository, specService, jobManager, feastProperties, + new ConsolidatedJobStrategy(jobRepository), + kafkaTemplate); + + jcsWithJobPerStore = + new JobCoordinatorService( + jobRepository, + featureSetRepository, + specService, + jobManager, + feastProperties, + new JobPerStoreStrategy(jobRepository), kafkaTemplate); when(kafkaTemplate.sendDefault(any(), any())).thenReturn(new AsyncResult<>(null)); @@ -101,7 +115,7 @@ public void setUp() { @Test public void shouldDoNothingIfNoStoresFound() throws InvalidProtocolBufferException { when(specService.listStores(any())).thenReturn(ListStoresResponse.newBuilder().build()); - jcs.Poll(); + jcsWithConsolidation.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -117,7 +131,7 @@ public void shouldDoNothingIfNoMatchingFeatureSetsFound() throws InvalidProtocol when(specService.listFeatureSets( Filter.newBuilder().setProject("*").setFeatureSetName("*").build())) .thenReturn(ListFeatureSetsResponse.newBuilder().build()); - jcs.Poll(); + jcsWithConsolidation.Poll(); verify(jobRepository, times(0)).saveAndFlush(any()); } @@ -194,7 +208,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); - jcs.Poll(); + jcsWithConsolidation.Poll(); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); List actual = jobArgCaptor.getValue(); assertThat(actual, containsInAnyOrder(expected)); @@ -259,20 +273,14 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { when(jobRepository.findByStatus(JobStatus.RUNNING)).thenReturn(List.of()); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source1.getType(), - source1.getConfig(), - storeSpec.getName(), - JobStatus.getTerminalStates())) + source1.getType(), source1.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source2.getType(), - source2.getConfig(), - storeSpec.getName(), - JobStatus.getTerminalStates())) + source2.getType(), source2.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); - jcs.Poll(); + jcsWithConsolidation.Poll(); ArgumentCaptor> jobArgCaptor = ArgumentCaptor.forClass(List.class); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); @@ -332,15 +340,12 @@ public void shouldGroupJobsBySourceAndIgnoreDuplicateSourceObjects() when(jobRepository.findByStatus(JobStatus.RUNNING)).thenReturn(List.of()); when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source1.getType(), - source1.getConfig(), - storeSpec.getName(), - JobStatus.getTerminalStates())) + source1.getType(), source1.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); ArgumentCaptor> jobArgCaptor = ArgumentCaptor.forClass(List.class); - jcs.Poll(); + jcsWithConsolidation.Poll(); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); List actual = jobArgCaptor.getValue(); assertThat(actual, containsInAnyOrder(expected)); @@ -400,7 +405,7 @@ public void shouldStopDuplicateJobsForSource() throws InvalidProtocolBufferExcep .thenReturn(Optional.of(inputJobs.get(0))); when(jobRepository.findByStatus(JobStatus.RUNNING)).thenReturn(inputJobs); - jcs.Poll(); + jcsWithConsolidation.Poll(); ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); @@ -496,7 +501,7 @@ public void shouldUseStoreSubscriptionToMapStore() throws InvalidProtocolBufferE source2.getType(), source2.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.empty()); - jcs.Poll(); + jcsWithConsolidation.Poll(); verify(jobRepository, times(1)).saveAll(jobArgCaptor.capture()); List actual = jobArgCaptor.getValue(); @@ -548,7 +553,7 @@ public void shouldSendPendingFeatureSetToJobs() { when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) .thenReturn(ImmutableList.of(fs1, fs2, fs3)); - jcs.notifyJobsWhenFeatureSetUpdated(); + jcsWithConsolidation.notifyJobsWhenFeatureSetUpdated(); verify(kafkaTemplate).sendDefault(eq(fs1.getReference()), any(FeatureSetSpec.class)); verify(kafkaTemplate, never()).sendDefault(eq(fs2.getReference()), any(FeatureSetSpec.class)); @@ -581,7 +586,7 @@ public void shouldNotUpdateJobStatusVersionWhenKafkaUnavailable() { when(featureSetRepository.findAllByStatus(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)) .thenReturn(ImmutableList.of(fsInTest)); - jcs.notifyJobsWhenFeatureSetUpdated(); + jcsWithConsolidation.notifyJobsWhenFeatureSetUpdated(); assertThat(featureSetJobStatus.getVersion(), is(1)); } @@ -601,9 +606,11 @@ public void specAckListenerShouldDoNothingWhenMessageIsOutdated() { fsInTest.getName(), fsInTest.getProject().getName())) .thenReturn(fsInTest); - jcs.listenAckFromJobs(newAckMessage("project/invalid", 0, j1.getJob().getId())); - jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); - jcs.listenAckFromJobs(newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); + jcsWithConsolidation.listenAckFromJobs( + newAckMessage("project/invalid", 0, j1.getJob().getId())); + jcsWithConsolidation.listenAckFromJobs(newAckMessage(fsInTest.getReference(), 0, "")); + jcsWithConsolidation.listenAckFromJobs( + newAckMessage(fsInTest.getReference(), -1, j1.getJob().getId())); assertThat(j1.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); assertThat(j2.getDeliveryStatus(), is(STATUS_IN_PROGRESS)); @@ -629,13 +636,13 @@ public void specAckListenerShouldUpdateFeatureSetStatus() { fsInTest.getName(), fsInTest.getProject().getName())) .thenReturn(fsInTest); - jcs.listenAckFromJobs( + jcsWithConsolidation.listenAckFromJobs( newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j1.getJob().getId())); assertThat(j1.getDeliveryStatus(), is(STATUS_DELIVERED)); assertThat(fsInTest.getStatus(), is(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); - jcs.listenAckFromJobs( + jcsWithConsolidation.listenAckFromJobs( newAckMessage(fsInTest.getReference(), fsInTest.getVersion(), j2.getJob().getId())); assertThat(j2.getDeliveryStatus(), is(STATUS_DELIVERED)); @@ -658,7 +665,7 @@ public void featureSetsShouldBeAllocated() { featureSet2.setVersion(5); Job job = new Job(); - jcs.allocateFeatureSets(job, ImmutableSet.of(featureSet1)); + jcsWithConsolidation.allocateFeatureSets(job, ImmutableSet.of(featureSet1)); FeatureSetJobStatus expectedStatus1 = new FeatureSetJobStatus(); expectedStatus1.setJob(job); @@ -671,7 +678,7 @@ public void featureSetsShouldBeAllocated() { expectedStatus1.setDeliveryStatus(STATUS_DELIVERED); job.getFeatureSetJobStatuses().forEach(j -> j.setDeliveryStatus(STATUS_DELIVERED)); - jcs.allocateFeatureSets(job, ImmutableSet.of(featureSet1, featureSet2)); + jcsWithConsolidation.allocateFeatureSets(job, ImmutableSet.of(featureSet1, featureSet2)); FeatureSetJobStatus expectedStatus2 = new FeatureSetJobStatus(); expectedStatus2.setJob(job); @@ -700,7 +707,9 @@ public void shouldCheckStatusOfAbortingJob() { source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.of(job)); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcsWithConsolidation.makeJobUpdateTasks( + ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("CheckStatus is expected", tasks.get(0) instanceof UpdateJobStatusTask); } @@ -723,7 +732,9 @@ public void shouldUpgradeJobWhenNeeded() { source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.of(job)); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcsWithConsolidation.makeJobUpdateTasks( + ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("UpgradeTask is expected", tasks.get(0) instanceof UpgradeJobTask); } @@ -733,23 +744,88 @@ public void shouldCreateJobIfNoRunning() { Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); Store store = TestUtil.createStore("store", Collections.emptyList()); - Job job = - Job.builder() - .setStatus(JobStatus.ERROR) - .setFeatureSetJobStatuses(new HashSet<>()) - .setExtId("") - .build(); - when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) - .thenReturn(Optional.of(job)); + .thenReturn(Optional.empty()); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcsWithConsolidation.makeJobUpdateTasks( + ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); } + @Test + public void shouldCreateJobPerStore() throws InvalidProtocolBufferException { + Store store1 = + TestUtil.createStore( + "test-1", List.of(Subscription.newBuilder().setName("*").setProject("*").build())); + Store store2 = + TestUtil.createStore( + "test-2", List.of(Subscription.newBuilder().setName("*").setProject("*").build())); + + Source source = TestUtil.createKafkaSource("kafka", "topic", false); + + when(specService.listStores(any())) + .thenReturn( + ListStoresResponse.newBuilder() + .addStore(store1.toProto()) + .addStore(store2.toProto()) + .build()); + + when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAsc("%", "%")) + .thenReturn(ImmutableList.of(TestUtil.createEmptyFeatureSet("fs", source))); + + when(jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + eq(source.getType()), + eq(source.getConfig()), + any(), + eq(JobStatus.getTerminalStates()))) + .thenReturn(Optional.empty()); + + Job expected1 = + Job.builder() + .setSource(source) + .setStores(ImmutableSet.of(store1)) + .setRunner(Runner.DATAFLOW) + .build(); + + Job expected2 = + Job.builder() + .setSource(source) + .setStores(ImmutableSet.of(store2)) + .setRunner(Runner.DATAFLOW) + .build(); + + when(jobManager.startJob(any())).thenReturn(new Job()); + when(jobManager.getRunnerType()).thenReturn(Runner.DATAFLOW); + + jcsWithJobPerStore.Poll(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + verify(jobManager, times(2)).startJob(jobCaptor.capture()); + List actual = jobCaptor.getAllValues(); + + assertThat(actual, containsInAnyOrder(expected1, expected2)); + assertThat( + actual, + hasItem( + hasProperty( + "id", + containsString( + String.format("kafka-%d-to-test-1", Objects.hashCode(source.getConfig())))))); + assertThat( + actual, + hasItem( + hasProperty( + "id", + containsString( + String.format("kafka-%d-to-test-2", Objects.hashCode(source.getConfig())))))); + } + private ConsumerRecord newAckMessage( String key, int version, String jobName) { return new ConsumerRecord<>( diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java index 3fd01cf4f1..7174a5672b 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java @@ -99,8 +99,9 @@ public PCollectionTuple expand(PBegin input) { } private String generateConsumerGroupId(String jobName) { + String[] split = jobName.split("-"); String jobNameWithoutTimestamp = - Arrays.stream(jobName.split("-")).limit(4).collect(Collectors.joining("-")); + Arrays.stream(split).limit(split.length - 1).collect(Collectors.joining("-")); return "feast_import_job_" + jobNameWithoutTimestamp; } }