From 45b74c470a70ef8531947b1f63f5441b60cc5755 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 24 Jun 2020 16:10:56 +0300 Subject: [PATCH] job grouping strategy --- .../feast/core/config/FeastProperties.java | 3 + .../java/feast/core/config/JobConfig.java | 14 ++++ .../core/job/ConsolidatedJobStrategy.java | 78 ++++++++++++++++++ .../java/feast/core/job/CreateJobTask.java | 12 --- .../feast/core/job/JobGroupingStrategy.java | 34 ++++++++ .../feast/core/job/JobPerStoreStrategy.java | 80 +++++++++++++++++++ .../core/service/JobCoordinatorService.java | 59 +++++--------- core/src/main/resources/application.yml | 4 + .../java/feast/core/job/JobTasksTest.java | 10 +-- .../service/JobCoordinatorServiceTest.java | 22 +++-- .../ingestion/transform/ReadFromSource.java | 3 +- 11 files changed, 249 insertions(+), 70 deletions(-) create mode 100644 core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java create mode 100644 core/src/main/java/feast/core/job/JobGroupingStrategy.java create mode 100644 core/src/main/java/feast/core/job/JobPerStoreStrategy.java diff --git a/core/src/main/java/feast/core/config/FeastProperties.java b/core/src/main/java/feast/core/config/FeastProperties.java index 62d0d3bea5..753630732a 100644 --- a/core/src/main/java/feast/core/config/FeastProperties.java +++ b/core/src/main/java/feast/core/config/FeastProperties.java @@ -81,6 +81,9 @@ public static class JobProperties { /* The active Apache Beam runner name. This name references one instance of the Runner class */ private String activeRunner; + /* If true only one IngestionJob would be created per source with all subscribed stores in it */ + private Boolean consolidateJobsPerSource = false; + /** List of configured job runners. */ private List runners = new ArrayList<>(); diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index fd0dc6ef5c..ebcf7c5fef 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -20,7 +20,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import feast.core.config.FeastProperties.JobProperties; +import feast.core.dao.JobRepository; +import feast.core.job.ConsolidatedJobStrategy; +import feast.core.job.JobGroupingStrategy; import feast.core.job.JobManager; +import feast.core.job.JobPerStoreStrategy; import feast.core.job.dataflow.DataflowJobManager; import feast.core.job.direct.DirectJobRegistry; import feast.core.job.direct.DirectRunnerJobManager; @@ -64,6 +68,16 @@ public IngestionJobProto.SpecsStreamingUpdateConfig createSpecsStreamingUpdateCo .build(); } + @Bean + public JobGroupingStrategy getJobGroupingStrategy( + FeastProperties feastProperties, JobRepository jobRepository) { + if (feastProperties.getJobs().getConsolidateJobsPerSource()) { + return new ConsolidatedJobStrategy(jobRepository); + } else { + return new JobPerStoreStrategy(jobRepository); + } + } + /** * Get a JobManager according to the runner type and Dataflow configuration. * diff --git a/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java new file mode 100644 index 0000000000..b407e29a33 --- /dev/null +++ b/core/src/main/java/feast/core/job/ConsolidatedJobStrategy.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.dao.JobRepository; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import feast.core.model.Store; +import java.time.Instant; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +public class ConsolidatedJobStrategy implements JobGroupingStrategy { + private final JobRepository jobRepository; + + public ConsolidatedJobStrategy(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + @Override + public Job getOrCreateJob(Source source, Set stores) { + return jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) + .orElseGet( + () -> + Job.builder() + .setSource(source) + .setStores(stores) + .setFeatureSetJobStatuses(new HashSet<>()) + .build()); + } + + @Override + public String createJobId(Job job) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + String jobId = + String.format( + "%s-%d-%s", + job.getSource().getTypeString(), + Objects.hashCode(job.getSource().getConfig()), + dateSuffix); + return jobId.replaceAll("_store", "-").toLowerCase(); + } + + @Override + public Iterable>> collectSingleJobInput( + Stream> stream) { + Map> map = + stream.collect( + Collectors.groupingBy( + Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet()))); + + return map.entrySet().stream() + .map(e -> Pair.of(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + } +} diff --git a/core/src/main/java/feast/core/job/CreateJobTask.java b/core/src/main/java/feast/core/job/CreateJobTask.java index 867dcfc02a..d4d6a8b2ef 100644 --- a/core/src/main/java/feast/core/job/CreateJobTask.java +++ b/core/src/main/java/feast/core/job/CreateJobTask.java @@ -20,8 +20,6 @@ import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Source; -import java.time.Instant; -import java.util.Objects; import lombok.Builder; import lombok.Getter; import lombok.Setter; @@ -43,12 +41,10 @@ public class CreateJobTask implements JobTask { @Override public Job call() { - String jobId = createJobId(job.getSource()); String runnerName = jobManager.getRunnerType().toString(); job.setRunner(jobManager.getRunnerType()); job.setStatus(JobStatus.PENDING); - job.setId(jobId); try { JobTask.logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName); @@ -73,12 +69,4 @@ public Job call() { return job; } } - - String createJobId(Source source) { - String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); - String jobId = - String.format( - "%s-%d-%s", source.getTypeString(), Objects.hashCode(source.getConfig()), dateSuffix); - return jobId.replaceAll("_store", "-").toLowerCase(); - } } diff --git a/core/src/main/java/feast/core/job/JobGroupingStrategy.java b/core/src/main/java/feast/core/job/JobGroupingStrategy.java new file mode 100644 index 0000000000..82cc54c189 --- /dev/null +++ b/core/src/main/java/feast/core/job/JobGroupingStrategy.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import feast.core.model.Job; +import feast.core.model.Source; +import feast.core.model.Store; +import java.util.Set; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +public interface JobGroupingStrategy { + /** Get the non terminated ingestion job ingesting for given source and stores. */ + public Job getOrCreateJob(Source source, Set stores); + + public String createJobId(Job job); + + public Iterable>> collectSingleJobInput( + Stream> stream); +} diff --git a/core/src/main/java/feast/core/job/JobPerStoreStrategy.java b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java new file mode 100644 index 0000000000..211e0c6c0e --- /dev/null +++ b/core/src/main/java/feast/core/job/JobPerStoreStrategy.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import com.google.common.collect.Lists; +import feast.core.dao.JobRepository; +import feast.core.model.Job; +import feast.core.model.JobStatus; +import feast.core.model.Source; +import feast.core.model.Store; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; + +public class JobPerStoreStrategy implements JobGroupingStrategy { + private final JobRepository jobRepository; + + public JobPerStoreStrategy(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + @Override + public Job getOrCreateJob(Source source, Set stores) { + ArrayList storesList = Lists.newArrayList(stores); + if (storesList.size() != 1) { + throw new RuntimeException("Only one store is acceptable in JobPerStore Strategy"); + } + Store store = storesList.get(0); + + return jobRepository + .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( + source.getType(), source.getConfig(), store.getName(), JobStatus.getTerminalStates()) + .orElseGet( + () -> + Job.builder() + .setSource(source) + .setStoreName(store.getName()) + .setStores(stores) + .setFeatureSetJobStatuses(new HashSet<>()) + .build()); + } + + @Override + public String createJobId(Job job) { + String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); + String jobId = + String.format( + "%s-%d-to-%s-%s", + job.getSource().getTypeString(), + Objects.hashCode(job.getSource().getConfig()), + job.getStoreName(), + dateSuffix); + return jobId.replaceAll("_store", "-").toLowerCase(); + } + + @Override + public Iterable>> collectSingleJobInput( + Stream> stream) { + return stream.map(p -> Pair.of(p.getLeft(), Set.of(p.getRight()))).collect(Collectors.toList()); + } +} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 7ed3f61807..f5defb5d16 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -19,12 +19,10 @@ import static feast.core.model.FeatureSet.parseReference; import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; import feast.core.config.FeastProperties; import feast.core.config.FeastProperties.JobProperties; import feast.core.dao.FeatureSetRepository; import feast.core.dao.JobRepository; -import feast.core.dao.SourceRepository; import feast.core.job.*; import feast.core.model.*; import feast.core.model.FeatureSet; @@ -39,6 +37,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -57,28 +56,28 @@ public class JobCoordinatorService { private final JobRepository jobRepository; private final FeatureSetRepository featureSetRepository; - private final SourceRepository sourceRepository; private final SpecService specService; private final JobManager jobManager; private final JobProperties jobProperties; + private final JobGroupingStrategy groupingStrategy; private final KafkaTemplate specPublisher; @Autowired public JobCoordinatorService( JobRepository jobRepository, FeatureSetRepository featureSetRepository, - SourceRepository sourceRepository, SpecService specService, JobManager jobManager, FeastProperties feastProperties, + JobGroupingStrategy groupingStrategy, KafkaTemplate specPublisher) { this.jobRepository = jobRepository; this.featureSetRepository = featureSetRepository; - this.sourceRepository = sourceRepository; this.specService = specService; this.jobManager = jobManager; this.jobProperties = feastProperties.getJobs(); this.specPublisher = specPublisher; + this.groupingStrategy = groupingStrategy; } /** @@ -94,9 +93,9 @@ public JobCoordinatorService( */ @Transactional @Scheduled(fixedDelayString = "${feast.jobs.polling_interval_milliseconds}") - public void Poll() throws InvalidProtocolBufferException { + public void Poll() { log.info("Polling for new jobs..."); - Map> sourceStoreMappings = getSourceToStoreMappings(); + Iterable>> sourceStoreMappings = getSourceToStoreMappings(); List jobUpdateTasks = makeJobUpdateTasks(sourceStoreMappings); if (jobUpdateTasks.isEmpty()) { @@ -137,18 +136,18 @@ void startOrUpdateJobs(List tasks) { * stop ingestion jobs its the required ingestions to maintained ingestion jobs are already * RUNNING. * - * @param sourceStoresMappings a list of source to store pairs where ingestion jobs would have to + * @param sourceToStores a iterable of source to stores pairs where ingestion jobs would have to * be maintained for ingestion to work correctly. * @return list of job update tasks required to reconcile the current ingestion jobs to the state * that is defined by sourceStoreMap. */ - List makeJobUpdateTasks(Map> sourceStoresMappings) { + List makeJobUpdateTasks(Iterable>> sourceToStores) { List jobTasks = new LinkedList<>(); // Ensure a running job for each source to store mapping List activeJobs = new LinkedList<>(); boolean isSafeToStopJobs = true; - for (Map.Entry> mapping : sourceStoresMappings.entrySet()) { + for (Pair> mapping : sourceToStores) { Source source = mapping.getKey(); Set stores = mapping.getValue(); Set featureSets = @@ -156,7 +155,7 @@ List makeJobUpdateTasks(Map> sourceStoresMappings) { .flatMap(s -> getFeatureSetsForStore(s).stream()) .collect(Collectors.toSet()); - Job job = getOrCreateJob(source, stores); + Job job = groupingStrategy.getOrCreateJob(source, stores); if (job.isDeployed()) { if (!job.isRunning()) { @@ -175,6 +174,8 @@ List makeJobUpdateTasks(Map> sourceStoresMappings) { jobTasks.add(UpdateJobStatusTask.builder().setJob(job).setJobManager(jobManager).build()); } } else { + job.setId(groupingStrategy.createJobId(job)); + jobTasks.add(CreateJobTask.builder().setJob(job).setJobManager(jobManager).build()); } @@ -250,21 +251,6 @@ void allocateFeatureSets(Job job, Set featureSets) { } } - /** Get the non terminated ingestion job ingesting for given source. */ - public Job getOrCreateJob(Source source, Set stores) { - return jobRepository - .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( - source.getType(), source.getConfig(), null, JobStatus.getTerminalStates()) - .orElseGet( - () -> - Job.builder() - .setRunner(jobManager.getRunnerType()) - .setSource(source) - .setStores(stores) - .setFeatureSetJobStatuses(new HashSet<>()) - .build()); - } - /** Get running extra ingestion jobs that have ids not in keepJobs */ @Transactional private Collection getExtraJobs(List keepJobs) { @@ -281,7 +267,7 @@ private Collection getExtraJobs(List keepJobs) { * * @return a Map from source to stores. */ - private Map> getSourceToStoreMappings() { + private Iterable>> getSourceToStoreMappings() { ListStoresResponse listStoresResponse = specService.listStores(Filter.newBuilder().build()); List stores = listStoresResponse.getStoreList().stream() @@ -291,16 +277,15 @@ private Map> getSourceToStoreMappings() { // build mapping from source to store. // compile a set of sources via subscribed FeatureSets of stores. - return stores.stream() - .flatMap( - store -> - getFeatureSetsForStore(store).stream() - .map(FeatureSet::getSource) - .map(source -> Pair.of(source, store))) - .distinct() - .collect( - Collectors.groupingBy( - Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet()))); + Stream> distinctPairs = + stores.stream() + .flatMap( + store -> + getFeatureSetsForStore(store).stream() + .map(FeatureSet::getSource) + .map(source -> Pair.of(source, store))) + .distinct(); + return groupingStrategy.collectSingleJobInput(distinctPairs); } /** diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 57728e0dc8..b82e5db0b4 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -60,6 +60,10 @@ feast: # Port of the metrics sink. port: 9125 + # if true one job per source with many stores would be created + # if false one job per source-store pair would be created + consolidate-jobs-per-source: false + stream: # Feature stream type. Only kafka is supported. type: kafka diff --git a/core/src/test/java/feast/core/job/JobTasksTest.java b/core/src/test/java/feast/core/job/JobTasksTest.java index 950bd23d1e..e0c0cd182c 100644 --- a/core/src/test/java/feast/core/job/JobTasksTest.java +++ b/core/src/test/java/feast/core/job/JobTasksTest.java @@ -18,8 +18,6 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -107,10 +105,9 @@ TerminateJobTask makeTerminateTask(Job currentJob) { @Test public void shouldCreateJobIfNotPresent() { - Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); + Job expectedInput = makeJob("ext", Collections.emptyList(), JobStatus.PENDING); - CreateJobTask task = spy(makeCreateTask(expectedInput)); - doReturn("job").when(task).createJobId(source); + CreateJobTask task = makeCreateTask(expectedInput); Job expected = makeJob("ext", Collections.emptyList(), JobStatus.RUNNING); @@ -135,8 +132,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { Job expectedInput = makeJob("", Collections.emptyList(), JobStatus.PENDING); - CreateJobTask jobUpdateTask = spy(makeCreateTask(expectedInput)); - doReturn("job").when(jobUpdateTask).createJobId(source); + CreateJobTask jobUpdateTask = makeCreateTask(expectedInput); Job expected = makeJob("", Collections.emptyList(), JobStatus.ERROR); diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index c19641149b..cbc187bc17 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -27,7 +27,6 @@ import static org.mockito.MockitoAnnotations.initMocks; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -54,6 +53,7 @@ import java.util.*; import java.util.concurrent.CancellationException; import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Before; import org.junit.Rule; @@ -89,10 +89,10 @@ public void setUp() { new JobCoordinatorService( jobRepository, featureSetRepository, - sourceRepository, specService, jobManager, feastProperties, + new ConsolidatedJobStrategy(jobRepository), kafkaTemplate); when(kafkaTemplate.sendDefault(any(), any())).thenReturn(new AsyncResult<>(null)); @@ -700,7 +700,8 @@ public void shouldCheckStatusOfAbortingJob() { source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.of(job)); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcs.makeJobUpdateTasks(ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("CheckStatus is expected", tasks.get(0) instanceof UpdateJobStatusTask); } @@ -723,7 +724,8 @@ public void shouldUpgradeJobWhenNeeded() { source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) .thenReturn(Optional.of(job)); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcs.makeJobUpdateTasks(ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("UpgradeTask is expected", tasks.get(0) instanceof UpgradeJobTask); } @@ -733,19 +735,13 @@ public void shouldCreateJobIfNoRunning() { Source source = TestUtil.createKafkaSource("kafka:9092", "topic", false); Store store = TestUtil.createStore("store", Collections.emptyList()); - Job job = - Job.builder() - .setStatus(JobStatus.ERROR) - .setFeatureSetJobStatuses(new HashSet<>()) - .setExtId("") - .build(); - when(jobRepository .findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc( source.getType(), source.getConfig(), null, JobStatus.getTerminalStates())) - .thenReturn(Optional.of(job)); + .thenReturn(Optional.empty()); - List tasks = jcs.makeJobUpdateTasks(ImmutableMap.of(source, ImmutableSet.of(store))); + List tasks = + jcs.makeJobUpdateTasks(ImmutableList.of(Pair.of(source, ImmutableSet.of(store)))); assertThat("CreateTask is expected", tasks.get(0) instanceof CreateJobTask); } diff --git a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java index 3fd01cf4f1..7174a5672b 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java +++ b/ingestion/src/main/java/feast/ingestion/transform/ReadFromSource.java @@ -99,8 +99,9 @@ public PCollectionTuple expand(PBegin input) { } private String generateConsumerGroupId(String jobName) { + String[] split = jobName.split("-"); String jobNameWithoutTimestamp = - Arrays.stream(jobName.split("-")).limit(4).collect(Collectors.joining("-")); + Arrays.stream(split).limit(split.length - 1).collect(Collectors.joining("-")); return "feast_import_job_" + jobNameWithoutTimestamp; } }